diff --git a/common-api/src/main/scala/pl/touk/nussknacker/engine/api/Comment.scala b/common-api/src/main/scala/pl/touk/nussknacker/engine/api/Comment.scala
index 891a96638d0..635f8c8d898 100644
--- a/common-api/src/main/scala/pl/touk/nussknacker/engine/api/Comment.scala
+++ b/common-api/src/main/scala/pl/touk/nussknacker/engine/api/Comment.scala
@@ -7,7 +7,8 @@ class Comment private (val content: String) extends AnyVal {
object Comment {
def from(content: String): Option[Comment] = {
- if (content.isEmpty) None else Some(new Comment(content))
+ val trimmedContent = content.trim
+ if (trimmedContent.nonEmpty) Some(new Comment(trimmedContent)) else None
}
def unsafeFrom(content: String): Comment = {
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpService.scala
index ff2e663f036..a669b48acbb 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpService.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpService.scala
@@ -272,7 +272,7 @@ class ScenarioActivityApiHttpService(
private def toDto(scenarioComment: ScenarioComment): Dtos.ScenarioActivityComment = scenarioComment match {
case ScenarioComment.WithContent(comment, _, _) =>
Dtos.ScenarioActivityComment(
- content = Dtos.ScenarioActivityCommentContent.Available(comment),
+ content = Dtos.ScenarioActivityCommentContent.Available(comment.content),
lastModifiedBy = scenarioComment.lastModifiedByUserName.value,
lastModifiedAt = scenarioComment.lastModifiedAt,
)
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/initialization/Initialization.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/initialization/Initialization.scala
index caddec0891a..8e90c525c83 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/initialization/Initialization.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/initialization/Initialization.scala
@@ -118,11 +118,10 @@ class AutomaticMigration(
.sequenceOption(for {
migrator <- migrators.forProcessingType(processDetails.processingType)
migrationResult <- migrator.migrateProcess(processDetails, skipEmptyMigrations = true)
- automaticUpdateAction = migrationResult
- .toAutomaticProcessUpdateAction(
- processDetails.processId,
- processDetails.scenarioLabels.map(ScenarioLabel.apply)
- )
+ automaticUpdateAction <- migrationResult.toAutomaticProcessUpdateAction(
+ processDetails.processId,
+ processDetails.scenarioLabels.map(ScenarioLabel.apply)
+ )
} yield {
processRepository.performAutomaticUpdate(automaticUpdateAction)
})
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/ScenarioActivityAuditLog.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/ScenarioActivityAuditLog.scala
index 12c1dadd1f9..1ae4b861530 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/ScenarioActivityAuditLog.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/ScenarioActivityAuditLog.scala
@@ -182,7 +182,7 @@ object ScenarioActivityAuditLog {
}
private def stringify(comment: ScenarioComment): String = comment match {
- case ScenarioComment.WithContent(comment, _, _) => comment
+ case ScenarioComment.WithContent(comment, _, _) => comment.content
case ScenarioComment.WithoutContent(_, _) => "none"
}
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/migrate/ProcessModelMigrator.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/migrate/ProcessModelMigrator.scala
index 51a2cf32a67..9d090465fff 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/migrate/ProcessModelMigrator.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/migrate/ProcessModelMigrator.scala
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.ui.process.migrate
+import cats.data.NonEmptyList
import pl.touk.nussknacker.engine.api.graph.ScenarioGraph
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
@@ -11,15 +12,23 @@ import pl.touk.nussknacker.ui.process.repository.ScenarioWithDetailsEntity
final case class MigrationResult(process: CanonicalProcess, migrationsApplied: List[ProcessMigration]) {
- def toAutomaticProcessUpdateAction(processId: ProcessId, labels: List[ScenarioLabel]): AutomaticProcessUpdateAction =
- AutomaticProcessUpdateAction(
- processId = processId,
- canonicalProcess = process,
- labels = labels,
- increaseVersionWhenJsonNotChanged = true,
- forwardedUserName = None,
- migrationsApplies = migrationsApplied
- )
+ def toAutomaticProcessUpdateAction(
+ processId: ProcessId,
+ labels: List[ScenarioLabel]
+ ): Option[AutomaticProcessUpdateAction] = {
+ NonEmptyList
+ .fromList(migrationsApplied)
+ .map { migrations =>
+ AutomaticProcessUpdateAction(
+ processId = processId,
+ canonicalProcess = process,
+ labels = labels,
+ increaseVersionWhenJsonNotChanged = true,
+ forwardedUserName = None,
+ migrationsApplied = migrations
+ )
+ }
+ }
}
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newactivity/ActivityService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newactivity/ActivityService.scala
index 41ead0937df..aeff6e772ad 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newactivity/ActivityService.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newactivity/ActivityService.scala
@@ -75,10 +75,7 @@ class ActivityService(
user = loggedUser.scenarioUser,
date = now,
scenarioVersionId = Some(ScenarioVersionId.from(scenarioGraphVersionId)),
- comment = commentOpt match {
- case Some(comment) => ScenarioComment.WithContent(comment.content, UserName(loggedUser.username), now)
- case None => ScenarioComment.WithoutContent(UserName(loggedUser.username), now)
- },
+ comment = ScenarioComment.from(commentOpt, UserName(loggedUser.username), now),
result = DeploymentResult.Success(clock.instant()),
)
)
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/ProcessRepository.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/ProcessRepository.scala
index 8592ad868ae..505ec16d56f 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/ProcessRepository.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/ProcessRepository.scala
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.ui.process.repository
import akka.http.scaladsl.model.HttpHeader
+import cats.data.NonEmptyList
import com.typesafe.scalalogging.LazyLogging
import db.util.DBIOActionInstances._
import io.circe.generic.JsonCodec
@@ -98,7 +99,7 @@ object ProcessRepository {
labels: List[ScenarioLabel],
increaseVersionWhenJsonNotChanged: Boolean,
forwardedUserName: Option[RemoteUserName],
- migrationsApplies: List[ProcessMigration]
+ migrationsApplied: NonEmptyList[ProcessMigration]
) extends ModifyProcessAction
final case class ProcessUpdated(processId: ProcessId, oldVersion: Option[VersionId], newVersion: Option[VersionId])
@@ -210,19 +211,11 @@ class DBProcessRepository(
date = Instant.now(),
previousScenarioVersionId = oldVersionId.map(ScenarioVersionId.from),
scenarioVersionId = versionId.map(ScenarioVersionId.from),
- comment = updateProcessAction.comment.map(_.content) match {
- case Some(content) =>
- ScenarioComment.WithContent(
- comment = content,
- lastModifiedByUserName = UserName(loggedUser.username),
- lastModifiedAt = clock.instant(),
- )
- case None =>
- ScenarioComment.WithoutContent(
- lastModifiedByUserName = UserName(loggedUser.username),
- lastModifiedAt = clock.instant(),
- )
- },
+ comment = ScenarioComment.from(
+ content = updateProcessAction.comment,
+ lastModifiedByUserName = UserName(loggedUser.username),
+ lastModifiedAt = clock.instant(),
+ )
)
)
}
@@ -259,7 +252,7 @@ class DBProcessRepository(
user = loggedUser.scenarioUser,
date = Instant.now(),
scenarioVersionId = versionId.map(ScenarioVersionId.from),
- changes = automaticProcessUpdateAction.migrationsApplies.map(_.description).mkString(", "),
+ changes = automaticProcessUpdateAction.migrationsApplied.map(_.description).toList.mkString(", "),
)
)
}
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/DbScenarioActivityRepository.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/DbScenarioActivityRepository.scala
index d4fa7cf8931..69623e63bb7 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/DbScenarioActivityRepository.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/DbScenarioActivityRepository.scala
@@ -3,6 +3,7 @@ package pl.touk.nussknacker.ui.process.repository.activities
import cats.implicits.catsSyntaxEitherId
import com.typesafe.scalalogging.LazyLogging
import db.util.DBIOActionInstances.DB
+import pl.touk.nussknacker.engine.api.Comment
import pl.touk.nussknacker.engine.api.component.ProcessingMode
import pl.touk.nussknacker.engine.api.deployment.ScenarioAttachment.{AttachmentFilename, AttachmentId}
import pl.touk.nussknacker.engine.api.deployment._
@@ -292,8 +293,7 @@ class DbScenarioActivityRepository private (override protected val dbRef: DbRef,
toComment(
id,
activity,
- ScenarioComment
- .WithContent(s"Rename: [${activity.oldName}] -> [${activity.newName}]", UserName(""), activity.date),
+ ScenarioComment.from(s"Rename: [${activity.oldName}] -> [${activity.newName}]", UserName(""), activity.date),
None
)
case activity: ScenarioActivity.CommentAdded =>
@@ -306,8 +306,8 @@ class DbScenarioActivityRepository private (override protected val dbRef: DbRef,
toComment(
id,
activity,
- ScenarioComment.WithContent(
- comment = s"Scenario migrated from ${activity.sourceEnvironment.name} by ${activity.sourceUser.value}",
+ ScenarioComment.from(
+ content = s"Scenario migrated from ${activity.sourceEnvironment.name} by ${activity.sourceUser.value}",
lastModifiedByUserName = activity.user.name,
lastModifiedAt = activity.date
),
@@ -323,8 +323,8 @@ class DbScenarioActivityRepository private (override protected val dbRef: DbRef,
toComment(
id,
activity,
- ScenarioComment.WithContent(
- comment = s"Migrations applied: ${activity.changes}",
+ ScenarioComment.from(
+ content = s"Migrations applied: ${activity.changes}",
lastModifiedByUserName = activity.user.name,
lastModifiedAt = activity.date
),
@@ -505,8 +505,8 @@ class DbScenarioActivityRepository private (override protected val dbRef: DbRef,
private def comment(scenarioComment: ScenarioComment): Option[String] = {
scenarioComment match {
- case ScenarioComment.WithContent(comment, _, _) if comment.nonEmpty => Some(comment.value)
- case _ => None
+ case ScenarioComment.WithContent(comment, _, _) => Some(comment.content)
+ case ScenarioComment.WithoutContent(_, _) => None
}
}
@@ -676,21 +676,11 @@ class DbScenarioActivityRepository private (override protected val dbRef: DbRef,
for {
lastModifiedByUserName <- entity.lastModifiedByUserName.toRight("Missing lastModifiedByUserName field")
lastModifiedAt <- entity.lastModifiedAt.toRight("Missing lastModifiedAt field")
- } yield {
- entity.comment match {
- case Some(comment) if comment.nonEmpty =>
- ScenarioComment.WithContent(
- comment = comment,
- lastModifiedByUserName = UserName(lastModifiedByUserName),
- lastModifiedAt = lastModifiedAt.toInstant
- )
- case Some(_) | None =>
- ScenarioComment.WithoutContent(
- lastModifiedByUserName = UserName(lastModifiedByUserName),
- lastModifiedAt = lastModifiedAt.toInstant
- )
- }
- }
+ } yield ScenarioComment.from(
+ content = entity.comment.flatMap(Comment.from),
+ lastModifiedByUserName = UserName(lastModifiedByUserName),
+ lastModifiedAt = lastModifiedAt.toInstant
+ )
}
private def attachmentFromEntity(entity: ScenarioActivityEntityData): Either[String, ScenarioAttachment] = {
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/ScenarioActivityRepository.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/ScenarioActivityRepository.scala
index d0c842a8dfa..563a89c9fad 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/ScenarioActivityRepository.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/ScenarioActivityRepository.scala
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.ui.process.repository.activities
import db.util.DBIOActionInstances.DB
+import pl.touk.nussknacker.engine.api.Comment
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.process.{ProcessId, VersionId}
import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.Legacy
@@ -41,8 +42,8 @@ trait ScenarioActivityRepository {
user = user.scenarioUser,
date = now,
scenarioVersionId = Some(ScenarioVersionId.from(processVersionId)),
- comment = ScenarioComment.WithContent(
- comment = comment,
+ comment = ScenarioComment.from(
+ content = comment,
lastModifiedByUserName = UserName(user.username),
lastModifiedAt = now,
)
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/util/PdfExporter.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/util/PdfExporter.scala
index cfaa3ed0750..a0e7fae5647 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/util/PdfExporter.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/util/PdfExporter.scala
@@ -87,7 +87,7 @@ object PdfExporter extends LazyLogging {
out.toByteArray
}
- private def prepareFopXml(
+ private[util] def prepareFopXml(
svg: String,
processDetails: ScenarioWithDetailsEntity[ScenarioGraph],
processActivity: ProcessActivity,
@@ -204,21 +204,23 @@ object PdfExporter extends LazyLogging {
private def nodeDetails(node: NodeData) = {
- val nodeData = node match {
- case Source(_, SourceRef(typ, params), _) => ("Type", typ) :: params.map(p => (p.name, p.expression.expression))
- case Filter(_, expression, _, _) => List(("Expression", expression.expression))
+ val nodeData: List[(String, String)] = node match {
+ case Source(_, SourceRef(typ, params), _) =>
+ ("Type", typ) :: params.map(p => (p.name.value, p.expression.expression))
+ case Filter(_, expression, _, _) => List(("Expression", expression.expression))
case Enricher(_, ServiceRef(typ, params), output, _) =>
- ("Type", typ) :: ("Output", output) :: params.map(p => (p.name, p.expression.expression))
+ ("Type", typ) :: ("Output", output) :: params.map(p => (p.name.value, p.expression.expression))
// TODO: what about Swtich??
case Switch(_, expression, exprVal, _) => expression.map(e => ("Expression", e.expression)).toList
case Processor(_, ServiceRef(typ, params), _, _) =>
- ("Type", typ) :: params.map(p => (p.name, p.expression.expression))
- case Sink(_, SinkRef(typ, params), _, _, _) => ("Type", typ) :: params.map(p => (p.name, p.expression.expression))
+ ("Type", typ) :: params.map(p => (p.name.value, p.expression.expression))
+ case Sink(_, SinkRef(typ, params), _, _, _) =>
+ ("Type", typ) :: params.map(p => (p.name.value, p.expression.expression))
case CustomNode(_, output, typ, params, _) =>
- ("Type", typ) :: ("Output", output.getOrElse("")) :: params.map(p => (p.name, p.expression.expression))
+ ("Type", typ) :: ("Output", output.getOrElse("")) :: params.map(p => (p.name.value, p.expression.expression))
case FragmentInput(_, FragmentRef(typ, params, _), _, _, _) =>
- ("Type", typ) :: params.map(p => (p.name, p.expression.expression))
- case FragmentInputDefinition(_, parameters, _) => parameters.map(p => p.name -> p.typ.refClazzName)
+ ("Type", typ) :: params.map(p => (p.name.value, p.expression.expression))
+ case FragmentInputDefinition(_, parameters, _) => parameters.map(p => p.name.value -> p.typ.refClazzName)
case FragmentOutputDefinition(_, outputName, fields, _) =>
("Output name", outputName) :: fields.map(p => p.name -> p.expression.expression)
case Variable(_, name, expr, _) => (name -> expr.expression) :: Nil
@@ -226,15 +228,15 @@ object PdfExporter extends LazyLogging {
("Variable name", name) :: fields.map(p => p.name -> p.expression.expression)
case Join(_, output, typ, parameters, branch, _) =>
("Type", typ) :: ("Output", output.getOrElse("")) ::
- parameters.map(p => p.name -> p.expression.expression) ++ branch.flatMap(bp =>
- bp.parameters.map(p => s"${bp.branchId} - ${p.name}" -> p.expression.expression)
+ parameters.map(p => p.name.value -> p.expression.expression) ++ branch.flatMap(bp =>
+ bp.parameters.map(p => s"${bp.branchId} - ${p.name.value}" -> p.expression.expression)
)
case Split(_, _) => ("No parameters", "") :: Nil
// This should not happen in properly resolved scenario...
case _: BranchEndData => throw new IllegalArgumentException("Should not happen during PDF export")
case _: FragmentUsageOutput => throw new IllegalArgumentException("Should not happen during PDF export")
}
- val data = node.additionalFields
+ val data: List[(String, String)] = node.additionalFields
.flatMap(_.description)
.map(naf => ("Description", naf))
.toList ++ nodeData
@@ -250,7 +252,7 @@ object PdfExporter extends LazyLogging {
{
- data.map { case (key, value) =>
+ data.map { case (key: String, value: String) =>
diff --git a/designer/server/src/test/scala/db/migration/V1_057__MigrateActionsAndCommentsToScenarioActivities.scala b/designer/server/src/test/scala/db/migration/V1_057__MigrateActionsAndCommentsToScenarioActivities.scala
index 7e6cfd9aab3..bc9c38f453f 100644
--- a/designer/server/src/test/scala/db/migration/V1_057__MigrateActionsAndCommentsToScenarioActivities.scala
+++ b/designer/server/src/test/scala/db/migration/V1_057__MigrateActionsAndCommentsToScenarioActivities.scala
@@ -123,7 +123,7 @@ class V1_057__MigrateActionsAndCommentsToScenarioActivities
user = user,
date = date,
scenarioVersionId = sv,
- comment = WithContent("Deployment with scenario fix", user.name, date),
+ comment = ScenarioComment.from("Deployment with scenario fix", user.name, date),
result = DeploymentResult.Success(date),
)
)
@@ -139,7 +139,7 @@ class V1_057__MigrateActionsAndCommentsToScenarioActivities
user = user,
date = date,
scenarioVersionId = sv,
- comment = WithContent("I'm canceling this scenario, it causes problems", user.name, date),
+ comment = ScenarioComment.from("I'm canceling this scenario, it causes problems", user.name, date),
result = DeploymentResult.Success(date),
)
)
@@ -183,7 +183,7 @@ class V1_057__MigrateActionsAndCommentsToScenarioActivities
user = user,
date = date,
scenarioVersionId = sv,
- comment = WithContent("Paused because marketing campaign is paused for now", user.name, date),
+ comment = ScenarioComment.from("Paused because marketing campaign is paused for now", user.name, date),
result = DeploymentResult.Success(date),
)
)
@@ -239,7 +239,7 @@ class V1_057__MigrateActionsAndCommentsToScenarioActivities
user = user,
date = date,
scenarioVersionId = sv,
- comment = WithContent("Deployed at the request of business", user.name, date),
+ comment = ScenarioComment.from("Deployed at the request of business", user.name, date),
result = DeploymentResult.Success(date),
)
)
@@ -256,7 +256,7 @@ class V1_057__MigrateActionsAndCommentsToScenarioActivities
date = date,
scenarioVersionId = sv,
actionName = "special action",
- comment = WithContent("Special action needed to be executed", user.name, date),
+ comment = ScenarioComment.from("Special action needed to be executed", user.name, date),
result = DeploymentResult.Success(date),
)
)
@@ -281,7 +281,7 @@ class V1_057__MigrateActionsAndCommentsToScenarioActivities
user = ScenarioUser(None, UserName("John Doe"), None, None),
date = now.toInstant,
scenarioVersionId = Some(ScenarioVersionId(processVersionId)),
- comment = WithContent("ABC1", UserName(user), now.toInstant)
+ comment = ScenarioComment.from("ABC1", UserName(user), now.toInstant)
),
ScenarioActivity.CommentAdded(
scenarioId = ScenarioId(process.id.value),
@@ -289,7 +289,7 @@ class V1_057__MigrateActionsAndCommentsToScenarioActivities
user = ScenarioUser(None, UserName("John Doe"), None, None),
date = now.toInstant,
scenarioVersionId = Some(ScenarioVersionId(processVersionId)),
- comment = WithContent("ABC2", UserName(user), now.toInstant)
+ comment = ScenarioComment.from("ABC2", UserName(user), now.toInstant)
)
)
}
diff --git a/designer/server/src/test/scala/db/migration/V1_058__UpdateAndAddMissingScenarioActivitiesSpec.scala b/designer/server/src/test/scala/db/migration/V1_058__UpdateAndAddMissingScenarioActivitiesSpec.scala
index b34270796f6..7c4fa9204c8 100644
--- a/designer/server/src/test/scala/db/migration/V1_058__UpdateAndAddMissingScenarioActivitiesSpec.scala
+++ b/designer/server/src/test/scala/db/migration/V1_058__UpdateAndAddMissingScenarioActivitiesSpec.scala
@@ -69,8 +69,8 @@ class V1_058__UpdateAndAddMissingScenarioActivitiesSpec
),
date = Instant.now,
scenarioVersionId = None,
- comment = ScenarioComment.WithContent(
- comment = "Scenario migrated from TEST_ENV by test env user",
+ comment = ScenarioComment.from(
+ content = "Scenario migrated from TEST_ENV by test env user",
lastModifiedByUserName = UserName(loggedUser.username),
lastModifiedAt = Instant.now
)
@@ -128,8 +128,8 @@ class V1_058__UpdateAndAddMissingScenarioActivitiesSpec
date = Instant.now,
previousScenarioVersionId = None,
scenarioVersionId = None,
- comment = ScenarioComment.WithContent(
- comment = "Scenario migrated from TEST_ENV by test env user",
+ comment = ScenarioComment.from(
+ content = "Scenario migrated from TEST_ENV by test env user",
lastModifiedByUserName = UserName(loggedUser.username),
lastModifiedAt = Instant.now
)
@@ -177,8 +177,8 @@ class V1_058__UpdateAndAddMissingScenarioActivitiesSpec
),
date = Instant.now,
scenarioVersionId = None,
- comment = ScenarioComment.WithContent(
- comment = "Migrations applied: feature A\\nfeature B\\nfeature C",
+ comment = ScenarioComment.from(
+ content = "Migrations applied: feature A\\nfeature B\\nfeature C",
lastModifiedByUserName = UserName(loggedUser.username),
lastModifiedAt = Instant.now
)
@@ -226,8 +226,8 @@ class V1_058__UpdateAndAddMissingScenarioActivitiesSpec
date = Instant.now,
previousScenarioVersionId = None,
scenarioVersionId = None,
- comment = ScenarioComment.WithContent(
- comment = "Migrations applied: feature A\\nfeature B\\nfeature C",
+ comment = ScenarioComment.from(
+ content = "Migrations applied: feature A\\nfeature B\\nfeature C",
lastModifiedByUserName = UserName(loggedUser.username),
lastModifiedAt = Instant.now
)
@@ -274,8 +274,8 @@ class V1_058__UpdateAndAddMissingScenarioActivitiesSpec
),
date = Instant.now,
scenarioVersionId = None,
- comment = ScenarioComment.WithContent(
- comment = "Rename: [oldUglyName] -> [newPrettyName]",
+ comment = ScenarioComment.from(
+ content = "Rename: [oldUglyName] -> [newPrettyName]",
lastModifiedByUserName = UserName(loggedUser.username),
lastModifiedAt = Instant.now
)
diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala
index cca9d50b313..dcd25483e6d 100644
--- a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala
+++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala
@@ -102,8 +102,8 @@ class MockDeploymentManager(
date = Instant.now(),
scenarioVersionId = Some(ScenarioVersionId.from(processVersion.versionId)),
actionName = "Custom action of MockDeploymentManager just before deployment",
- comment = ScenarioComment.WithContent(
- comment = "With comment from DeploymentManager",
+ comment = ScenarioComment.from(
+ content = "With comment from DeploymentManager",
lastModifiedByUserName = ScenarioUser.internalNuUser.name,
lastModifiedAt = Instant.now()
),
diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpServiceBusinessSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpServiceBusinessSpec.scala
index a6229260ba5..dd99b751867 100644
--- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpServiceBusinessSpec.scala
+++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpServiceBusinessSpec.scala
@@ -403,8 +403,8 @@ class ScenarioActivityApiHttpServiceBusinessSpec
date = clock.instant(),
scenarioVersionId = None,
actionName = "Custom action handled by deployment manager",
- comment = ScenarioComment.WithContent(
- comment = "Executed on custom deployment manager",
+ comment = ScenarioComment.from(
+ content = "Executed on custom deployment manager",
lastModifiedByUserName = UserName("custom-user"),
lastModifiedAt = clock.instant()
),
@@ -417,8 +417,8 @@ class ScenarioActivityApiHttpServiceBusinessSpec
date = clock.instant(),
scenarioVersionId = None,
actionName = "Custom action handled by deployment manager",
- comment = ScenarioComment.WithContent(
- comment = "Executed on custom deployment manager",
+ comment = ScenarioComment.from(
+ content = "Executed on custom deployment manager",
lastModifiedByUserName = UserName("custom-user"),
lastModifiedAt = clock.instant()
),
@@ -481,8 +481,8 @@ class ScenarioActivityApiHttpServiceBusinessSpec
user = ScenarioUser(None, UserName("custom-user"), None, None),
date = clock.instant(),
scenarioVersionId = None,
- comment = ScenarioComment.WithContent(
- comment = "Immediate execution",
+ comment = ScenarioComment.from(
+ content = "Immediate execution",
lastModifiedByUserName = UserName("custom-user"),
lastModifiedAt = clock.instant()
),
diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala
index f6da26de30c..88c0bb1cab4 100644
--- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala
+++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala
@@ -402,11 +402,11 @@ class DeploymentServiceSpec
_,
_,
actionName,
- ScenarioComment.WithContent(content, _, _),
+ ScenarioComment.WithContent(comment, _, _),
result,
) =>
actionName shouldBe "Custom action of MockDeploymentManager just before deployment"
- content shouldBe "With comment from DeploymentManager"
+ comment.content shouldBe "With comment from DeploymentManager"
result shouldBe DeploymentResult.Success(result.dateFinished)
case _ => fail("Third activity should be CustomAction with comment")
}
@@ -943,23 +943,37 @@ class DeploymentServiceSpec
val processName: ProcessName = generateProcessName
val processIdWithName = prepareProcess(processName).dbioActionValues
val actionName = ScenarioActionName("hello")
- val comment = Comment.from("not empty comment")
+ val nonEmptyCommentContent = "not empty comment"
+ val comments = List(
+ Comment.from(nonEmptyCommentContent),
+ Comment.from(""),
+ Comment.from(" "),
+ )
- val result =
+ val results = comments.map { comment =>
deploymentService
.processCommand(
CustomActionCommand(CommonCommandData(processIdWithName, comment, user), actionName, Map.empty)
)
.futureValue
+ }
eventually {
- result shouldBe CustomActionResult("Hi")
- val action =
- actionRepository.getFinishedProcessActions(processIdWithName.id, Some(Set(actionName))).dbioActionValues
-
- action.loneElement.state shouldBe ProcessActionState.Finished
- action.loneElement.comment shouldBe comment.map(_.content)
- listener.events.toArray.filter(_.isInstanceOf[OnActionSuccess]) should have length 1
+ all(results) shouldBe CustomActionResult("Hi")
+ val actions =
+ actionRepository
+ .getFinishedProcessActions(processIdWithName.id, Some(Set(actionName)))
+ .dbioActionValues
+ .sortBy(_.createdAt)
+
+ actions.size shouldBe 3
+ all(actions.map(_.state)) shouldBe ProcessActionState.Finished
+ actions.map(_.comment) shouldBe List(
+ Some(nonEmptyCommentContent),
+ None,
+ None,
+ )
+ listener.events.toArray.filter(_.isInstanceOf[OnActionSuccess]) should have length 3
}
}
diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/migrate/ProcessModelMigratorSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/migrate/ProcessModelMigratorSpec.scala
index 5acd1cc532b..d7dc89ad201 100644
--- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/migrate/ProcessModelMigratorSpec.scala
+++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/migrate/ProcessModelMigratorSpec.scala
@@ -40,8 +40,8 @@ class ProcessModelMigratorSpec extends AnyFlatSpec with BeforeAndAfterEach with
extractParallelism(migrationResult) shouldBe 11
- migrationResult.toAutomaticProcessUpdateAction(ProcessId(1L), List.empty).migrationsApplies shouldBe
- migrationResult.migrationsApplied
+ migrationResult.toAutomaticProcessUpdateAction(ProcessId(1L), List.empty).map(_.migrationsApplied.toList) shouldBe
+ Some(migrationResult.migrationsApplied)
val processor = extractProcessor(migrationResult)
processor shouldBe ServiceRef(ProcessTestData.otherExistingServiceId, List())
@@ -63,8 +63,8 @@ class ProcessModelMigratorSpec extends AnyFlatSpec with BeforeAndAfterEach with
val processor = extractProcessor(migrationResult)
- migrationResult.toAutomaticProcessUpdateAction(ProcessId(1L), List.empty).migrationsApplies shouldBe
- migrationResult.migrationsApplied
+ migrationResult.toAutomaticProcessUpdateAction(ProcessId(1L), List.empty).map(_.migrationsApplied.toList) shouldBe
+ Some(migrationResult.migrationsApplied)
processor shouldBe ServiceRef(ProcessTestData.existingServiceId, List())
}
diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/util/PdfExporterSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/util/PdfExporterSpec.scala
index 2f57c408625..38ffb68030f 100644
--- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/util/PdfExporterSpec.scala
+++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/util/PdfExporterSpec.scala
@@ -6,11 +6,15 @@ import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.api.graph.{ProcessProperties, ScenarioGraph}
import pl.touk.nussknacker.engine.api.process.{ProcessName, ScenarioVersion, VersionId}
+import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.graph.node.{Filter, UserDefinedAdditionalNodeFields}
+import pl.touk.nussknacker.engine.spel.SpelExtension.SpelExpresion
import pl.touk.nussknacker.engine.util.ResourceLoader
+import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails
import pl.touk.nussknacker.test.utils.domain.{ProcessTestData, TestProcessUtil}
import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.Legacy.{Comment, ProcessActivity}
import pl.touk.nussknacker.ui.process.marshall.CanonicalProcessConverter
+import pl.touk.nussknacker.ui.process.repository.ScenarioWithDetailsEntity
import java.io.FileOutputStream
import java.time.Instant
@@ -52,6 +56,30 @@ class PdfExporterSpec extends AnyFlatSpec with Matchers {
IOUtils.write(exported, new FileOutputStream("/tmp/out.pdf"))
}
+ it should "render parameter names correctly in generated xml" in {
+ val givenSourceParamName = "someParamName"
+ val sampleScenarioGraph =
+ CanonicalProcessConverter.toScenarioGraph(
+ ScenarioBuilder
+ .streaming("foo")
+ .source("sourceId", "sourceType", givenSourceParamName -> "123".spel)
+ .emptySink("sinkId", "sinkType")
+ )
+ val xml = PdfExporter.prepareFopXml(
+ "",
+ createDetails(sampleScenarioGraph),
+ ProcessActivity(List.empty, List.empty),
+ sampleScenarioGraph
+ )
+ val blocks = xml \ "page-sequence" \ "flow" \ "block"
+ val blockWithSourceDetails = blocks(4)
+ val sourceParameterRow = (blockWithSourceDetails \ "table" \ "table-body" \ "table-row")(1)
+ val sourceParameterNameNode = (sourceParameterRow \ "table-cell")(0)
+ val sourceParameterNameText = (sourceParameterNameNode \ "block").text.trim
+
+ sourceParameterNameText shouldBe givenSourceParamName
+ }
+
it should "export empty process to " in {
val scenarioGraph: ScenarioGraph = ScenarioGraph(
ProcessProperties(StreamMetaData()),
diff --git a/docs/Changelog.md b/docs/Changelog.md
index f84a25a6851..cd3132a2435 100644
--- a/docs/Changelog.md
+++ b/docs/Changelog.md
@@ -11,6 +11,7 @@
### 1.19.0 (Not released yet)
* [#7145](https://github.com/TouK/nussknacker/pull/7145) Lift TypingResult information for dictionaries
+* [#7116](https://github.com/TouK/nussknacker/pull/7116) Improve missing Flink Kafka Source / Sink TypeInformation
## 1.18
@@ -110,6 +111,7 @@
* [#6721](https://github.com/TouK/nussknacker/pull/6721) Provide a popover to display additional information about count
* [#7099](https://github.com/TouK/nussknacker/pull/7099) Provide an option to embedded video to the markdown
* [#7102](https://github.com/TouK/nussknacker/pull/7102) Introduce a new UI to defining aggregations within nodes
+* [#7147](https://github.com/TouK/nussknacker/pull/7147) Fix redundant "ParameterName(...)" wrapper string in exported PDFs in nodes details
## 1.17
diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md
index 24ae0dbbc3d..308a2d12f8d 100644
--- a/docs/MigrationGuide.md
+++ b/docs/MigrationGuide.md
@@ -2,6 +2,14 @@
To see the biggest differences please consult the [changelog](Changelog.md).
+## In version 1.19.0 (Not released yet)
+
+### Other changes
+
+* [#7116](https://github.com/TouK/nussknacker/pull/7116) Improve missing Flink Kafka Source / Sink TypeInformation
+ * We lost support for old ConsumerRecord constructor supported by Flink 1.14 / 1.15
+ * If you used Kafka source/sink components in your scenarios then state of these scenarios won't be restored
+
## In version 1.18.0 (Not released yet)
### Configuration changes
diff --git a/docs/scenarios_authoring/DesignerTipsAndTricks.md b/docs/scenarios_authoring/DesignerTipsAndTricks.md
index 491322bae92..4c71140ec03 100644
--- a/docs/scenarios_authoring/DesignerTipsAndTricks.md
+++ b/docs/scenarios_authoring/DesignerTipsAndTricks.md
@@ -12,8 +12,8 @@ While Designer GUI in most cases is self-explanatory and forgiving, there are a
* **Selecting nodes** - use Shift button and left mouse button to select nodes. If you expand the selection area rightward, you need to fully mark node icons; alternatively you can mark node icons partially if you expand the selection area leftward.
* **Copy & paste** - you can copy and paste selected nodes; just ensure that your browser has access to the clipboard for the Nussknacker site you use.
* **Tab navigation** - you can navigate inside node details with TAB button.
-* **Panels in the Designer** - there are several panels on the left and right side of the scenario authoring canvass. You can move these panels and collapse them. You can always restore default layout using "reset" button in the View panel.
-* **Scenario versioning** - whenever a scenario is s-aved, a new version is created. You can revert to the older version of the scenario (see Versions panel). Adding comments when saving a scenario may help to find the right version.
+* **Panels in the Designer** - there are several panels on the left and right side of the scenario authoring canvas. You can move these panels and collapse them. You can always restore default layout using "reset" button in the View panel.
+* **Scenario versioning** - whenever a scenario is saved, a new version is created. You can revert to the older version of the scenario (see Activities panel). Adding comments when saving a scenario may help to find the right version.
* **Deleting a scenario** - scenarios can be archived and unarchived, they are never completely deleted.
* **Inserting a node into the flow** - you can drop a node on the edge connecting nodes and Designer will fit it into the flow.
diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala
index 50ad362f158..34bfbee9e3b 100644
--- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala
+++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala
@@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.flink.api.typeinformation
import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import pl.touk.nussknacker.engine.api.context.ValidationContext
+import pl.touk.nussknacker.engine.api.generics.GenericType
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult}
import pl.touk.nussknacker.engine.api.{Context, ValueWithContext}
import pl.touk.nussknacker.engine.util.Implicits.RichStringList
@@ -38,8 +39,14 @@ trait TypeInformationDetection extends Serializable {
forClass(klass)
}
- def forClass[T](klass: Class[T]): TypeInformation[T] =
- forType[T](Typed.typedClass(klass))
+ def forClass[T](klass: Class[T]): TypeInformation[T] = {
+ // Typed.typedClass doesn't support Any
+ if (klass == classOf[Any]) {
+ Types.GENERIC(klass)
+ } else {
+ forType[T](Typed.typedClass(klass))
+ }
+ }
def forType[T](typingResult: TypingResult): TypeInformation[T]
diff --git a/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala b/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala
index 9aa42b14e6d..dc8899248ac 100644
--- a/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala
+++ b/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala
@@ -12,11 +12,9 @@ import pl.touk.nussknacker.engine.api.component.{
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.kafka.source.flink.FlinkKafkaSourceImplFactory
+import pl.touk.nussknacker.engine.schemedkafka.FlinkUniversalSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClientFactory
-import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.{
- UniversalSchemaBasedSerdeProvider,
- UniversalSchemaRegistryClientFactory
-}
+import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.sink.UniversalKafkaSinkFactory
import pl.touk.nussknacker.engine.schemedkafka.sink.flink.FlinkKafkaUniversalSinkImplFactory
import pl.touk.nussknacker.engine.schemedkafka.source.UniversalKafkaSourceFactory
@@ -36,7 +34,7 @@ class FlinkKafkaComponentProvider extends ComponentProvider {
import docsConfig._
def universal(componentType: ComponentType) = s"DataSourcesAndSinks#kafka-$componentType"
- val universalSerdeProvider = UniversalSchemaBasedSerdeProvider.create(schemaRegistryClientFactory)
+ val universalSerdeProvider = FlinkUniversalSchemaBasedSerdeProvider.create(schemaRegistryClientFactory)
List(
ComponentDefinition(
diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala
index c44b4649096..ebec77566a5 100644
--- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala
+++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala
@@ -14,6 +14,7 @@ import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId}
import pl.touk.nussknacker.engine.management.periodic.PeriodicProcessService.{
DeploymentStatus,
EngineStatusesToReschedule,
+ FinishedScheduledExecutionMetadata,
MaxDeploymentsStatus,
PeriodicProcessStatus
}
@@ -56,7 +57,7 @@ class PeriodicProcessService(
private val emptyCallback: Callback = () => Future.successful(())
- private implicit val localDateOrdering: Ordering[LocalDateTime] = Ordering.by(identity[ChronoLocalDateTime[_]])
+ private implicit val localDateTimeOrdering: Ordering[LocalDateTime] = Ordering.by(identity[ChronoLocalDateTime[_]])
def getScenarioActivitiesSpecificToPeriodicProcess(
processIdWithName: ProcessIdWithName
@@ -65,17 +66,17 @@ class PeriodicProcessService(
groupedByProcess = schedulesState.groupedByPeriodicProcess
deployments = groupedByProcess.flatMap(_.deployments)
deploymentsWithStatuses = deployments.flatMap(d => scheduledExecutionStatusAndDateFinished(d).map((d, _)))
- activities = deploymentsWithStatuses.map { case (deployment, (status, dateFinished)) =>
+ activities = deploymentsWithStatuses.map { case (deployment, metadata) =>
ScenarioActivity.PerformedScheduledExecution(
scenarioId = ScenarioId(processIdWithName.id.value),
scenarioActivityId = ScenarioActivityId.random,
user = ScenarioUser.internalNuUser,
- date = instantAtSystemDefaultZone(deployment.runAt),
+ date = metadata.dateDeployed.getOrElse(metadata.dateFinished),
scenarioVersionId = Some(ScenarioVersionId.from(deployment.periodicProcess.processVersion.versionId)),
- scheduledExecutionStatus = status,
- dateFinished = dateFinished,
+ scheduledExecutionStatus = metadata.status,
+ dateFinished = metadata.dateFinished,
scheduleName = deployment.scheduleName.display,
- createdAt = instantAtSystemDefaultZone(deployment.createdAt),
+ createdAt = metadata.dateCreated,
nextRetryAt = deployment.nextRetryAt.map(instantAtSystemDefaultZone),
retriesLeft = deployment.nextRetryAt.map(_ => deployment.retriesLeft),
)
@@ -522,9 +523,8 @@ class PeriodicProcessService(
private def scheduledExecutionStatusAndDateFinished(
entity: PeriodicProcessDeployment[Unit],
- ): Option[(ScheduledExecutionStatus, Instant)] = {
+ ): Option[FinishedScheduledExecutionMetadata] = {
for {
- dateFinished <- entity.state.completedAt.map(instantAtSystemDefaultZone)
status <- entity.state.status match {
case PeriodicProcessDeploymentStatus.Scheduled =>
None
@@ -539,7 +539,15 @@ class PeriodicProcessService(
case PeriodicProcessDeploymentStatus.FailedOnDeploy =>
Some(ScheduledExecutionStatus.DeploymentFailed)
}
- } yield (status, dateFinished)
+ dateCreated = instantAtSystemDefaultZone(entity.createdAt)
+ dateDeployed = entity.state.deployedAt.map(instantAtSystemDefaultZone)
+ dateFinished <- entity.state.completedAt.map(instantAtSystemDefaultZone)
+ } yield FinishedScheduledExecutionMetadata(
+ status = status,
+ dateCreated = dateCreated,
+ dateDeployed = dateDeployed,
+ dateFinished = dateFinished
+ )
}
// LocalDateTime's in the context of PeriodicProcess are created using clock with system default timezone
@@ -710,4 +718,11 @@ object PeriodicProcessService {
}
+ private final case class FinishedScheduledExecutionMetadata(
+ status: ScheduledExecutionStatus,
+ dateCreated: Instant,
+ dateDeployed: Option[Instant],
+ dateFinished: Instant,
+ )
+
}
diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala
index e57623b43d1..8c0401c3339 100644
--- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala
+++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala
@@ -378,29 +378,6 @@ class PeriodicDeploymentManagerTest
PeriodicProcessDeploymentStatus.Finished,
PeriodicProcessDeploymentStatus.Scheduled
)
-
- val activities = f.periodicDeploymentManager match {
- case manager: ManagerSpecificScenarioActivitiesStoredByManager =>
- manager.managerSpecificScenarioActivities(idWithName).futureValue
- case _ =>
- List.empty
- }
- val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
- activities shouldBe List(
- ScenarioActivity.PerformedScheduledExecution(
- scenarioId = ScenarioId(1),
- scenarioActivityId = firstActivity.scenarioActivityId,
- user = ScenarioUser(None, UserName("Nussknacker"), None, None),
- date = firstActivity.date,
- scenarioVersionId = Some(ScenarioVersionId(1)),
- dateFinished = firstActivity.dateFinished,
- scheduleName = "[default]",
- scheduledExecutionStatus = ScheduledExecutionStatus.Finished,
- createdAt = firstActivity.createdAt,
- retriesLeft = None,
- nextRetryAt = None
- ),
- )
}
test("should cancel failed job after RescheduleActor handles finished") {
@@ -421,30 +398,6 @@ class PeriodicDeploymentManagerTest
f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed
f.getMergedStatusDetails.status shouldEqual SimpleStateStatus.Canceled
-
- val activities = f.periodicDeploymentManager match {
- case manager: ManagerSpecificScenarioActivitiesStoredByManager =>
- manager.managerSpecificScenarioActivities(idWithName).futureValue
- case _ =>
- List.empty
- }
- val headActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
- activities shouldBe List(
- ScenarioActivity.PerformedScheduledExecution(
- scenarioId = ScenarioId(1),
- scenarioActivityId = headActivity.scenarioActivityId,
- user = ScenarioUser(None, UserName("Nussknacker"), None, None),
- date = headActivity.date,
- scenarioVersionId = Some(ScenarioVersionId(1)),
- dateFinished = headActivity.dateFinished,
- scheduleName = "[default]",
- scheduledExecutionStatus = ScheduledExecutionStatus.Failed,
- createdAt = headActivity.createdAt,
- retriesLeft = None,
- nextRetryAt = None
- )
- )
-
}
test("should reschedule failed job after RescheduleActor handles finished when configured") {
diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala
index e4e0071ef81..8b6bcd8a564 100644
--- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala
+++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala
@@ -16,8 +16,18 @@ import org.scalatest.matchers.should.Matchers
import org.testcontainers.utility.DockerImageName
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus
-import pl.touk.nussknacker.engine.api.deployment.{DataFreshnessPolicy, ProcessActionId, ProcessingTypeActionServiceStub}
-import pl.touk.nussknacker.engine.api.process.ProcessName
+import pl.touk.nussknacker.engine.api.deployment.{
+ DataFreshnessPolicy,
+ ProcessActionId,
+ ProcessingTypeActionServiceStub,
+ ScenarioActivity,
+ ScenarioId,
+ ScenarioUser,
+ ScenarioVersionId,
+ ScheduledExecutionStatus,
+ UserName
+}
+import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessIdWithName, ProcessName}
import pl.touk.nussknacker.engine.api.{MetaData, ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.management.periodic.PeriodicProcessService.PeriodicProcessStatus
@@ -55,6 +65,8 @@ class PeriodicProcessServiceIntegrationTest
private val processName = ProcessName("test")
+ private val processIdWithName = ProcessIdWithName(ProcessId(1), processName)
+
private val sampleProcess = CanonicalProcess(MetaData(processName.value, StreamMetaData()), Nil)
private val startTime = Instant.parse("2021-04-06T13:18:00Z")
@@ -238,6 +250,24 @@ class PeriodicProcessServiceIntegrationTest
// TODO: we currently don't have Canceled status - to get full information about status someone have to check both state of PeriodicProcess (active/inactive)
// and state of deployment
inactiveStates.firstScheduleData.latestDeployments.head.state.status shouldBe PeriodicProcessDeploymentStatus.Scheduled
+
+ val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName).futureValue
+ val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
+ activities shouldBe List(
+ ScenarioActivity.PerformedScheduledExecution(
+ scenarioId = ScenarioId(1),
+ scenarioActivityId = firstActivity.scenarioActivityId,
+ user = ScenarioUser(None, UserName("Nussknacker"), None, None),
+ date = firstActivity.date,
+ scenarioVersionId = Some(ScenarioVersionId(1)),
+ dateFinished = firstActivity.dateFinished,
+ scheduleName = "[default]",
+ scheduledExecutionStatus = ScheduledExecutionStatus.Finished,
+ createdAt = firstActivity.createdAt,
+ retriesLeft = None,
+ nextRetryAt = None
+ ),
+ )
}
it should "redeploy scenarios that failed on deploy" in withFixture(deploymentRetryConfig =
@@ -269,6 +299,24 @@ class PeriodicProcessServiceIntegrationTest
service.deploy(toBeRetried).futureValue
service.findToBeDeployed.futureValue.toList shouldBe Nil
+
+ val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName).futureValue
+ val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
+ activities shouldBe List(
+ ScenarioActivity.PerformedScheduledExecution(
+ scenarioId = ScenarioId(1),
+ scenarioActivityId = firstActivity.scenarioActivityId,
+ user = ScenarioUser(None, UserName("Nussknacker"), None, None),
+ date = firstActivity.date,
+ scenarioVersionId = Some(ScenarioVersionId(1)),
+ dateFinished = firstActivity.dateFinished,
+ scheduleName = "[default]",
+ scheduledExecutionStatus = ScheduledExecutionStatus.DeploymentFailed,
+ createdAt = firstActivity.createdAt,
+ retriesLeft = None,
+ nextRetryAt = None
+ ),
+ )
}
it should "handle multiple schedules" in withFixture() { f =>
@@ -332,6 +380,9 @@ class PeriodicProcessServiceIntegrationTest
service.deactivate(processName).futureValue
service.getLatestDeploymentsForActiveSchedules(processName).futureValue shouldBe empty
+
+ val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName).futureValue
+ activities shouldBe empty
}
it should "wait until other schedule finishes, before deploying next schedule" in withFixture() { f =>
@@ -375,6 +426,24 @@ class PeriodicProcessServiceIntegrationTest
val toDeployAfterFinish = service.findToBeDeployed.futureValue
toDeployAfterFinish should have length 1
toDeployAfterFinish.head.scheduleName.value.value shouldBe secondSchedule
+
+ val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName).futureValue
+ val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
+ activities shouldBe List(
+ ScenarioActivity.PerformedScheduledExecution(
+ scenarioId = ScenarioId(1),
+ scenarioActivityId = firstActivity.scenarioActivityId,
+ user = ScenarioUser(None, UserName("Nussknacker"), None, None),
+ date = firstActivity.date,
+ scenarioVersionId = Some(ScenarioVersionId(1)),
+ dateFinished = firstActivity.dateFinished,
+ scheduleName = "schedule1",
+ scheduledExecutionStatus = ScheduledExecutionStatus.Finished,
+ createdAt = firstActivity.createdAt,
+ retriesLeft = None,
+ nextRetryAt = None
+ ),
+ )
}
it should "handle multiple one time schedules" in withFixture() { f =>
@@ -475,6 +544,38 @@ class PeriodicProcessServiceIntegrationTest
.futureValue
inactiveStates.latestDeploymentForSchedule(schedule1).state.status shouldBe PeriodicProcessDeploymentStatus.Finished
inactiveStates.latestDeploymentForSchedule(schedule2).state.status shouldBe PeriodicProcessDeploymentStatus.Finished
+
+ val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName).futureValue
+ val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
+ val secondActivity = activities(1).asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
+ activities shouldBe List(
+ ScenarioActivity.PerformedScheduledExecution(
+ scenarioId = ScenarioId(1),
+ scenarioActivityId = firstActivity.scenarioActivityId,
+ user = ScenarioUser(None, UserName("Nussknacker"), None, None),
+ date = firstActivity.date,
+ scenarioVersionId = Some(ScenarioVersionId(1)),
+ dateFinished = firstActivity.dateFinished,
+ scheduleName = "schedule1",
+ scheduledExecutionStatus = ScheduledExecutionStatus.Finished,
+ createdAt = firstActivity.createdAt,
+ retriesLeft = None,
+ nextRetryAt = None
+ ),
+ ScenarioActivity.PerformedScheduledExecution(
+ scenarioId = ScenarioId(1),
+ scenarioActivityId = secondActivity.scenarioActivityId,
+ user = ScenarioUser(None, UserName("Nussknacker"), None, None),
+ date = secondActivity.date,
+ scenarioVersionId = Some(ScenarioVersionId(1)),
+ dateFinished = secondActivity.dateFinished,
+ scheduleName = "schedule2",
+ scheduledExecutionStatus = ScheduledExecutionStatus.Finished,
+ createdAt = secondActivity.createdAt,
+ retriesLeft = None,
+ nextRetryAt = None
+ ),
+ )
}
it should "handle failed event handler" in withFixture() { f =>
@@ -542,6 +643,24 @@ class PeriodicProcessServiceIntegrationTest
val stateAfterHandleFinished = service.getLatestDeploymentsForActiveSchedules(processName).futureValue
stateAfterHandleFinished.latestDeploymentForSingleSchedule.state.status shouldBe PeriodicProcessDeploymentStatus.Scheduled
+
+ val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName).futureValue
+ val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
+ activities shouldBe List(
+ ScenarioActivity.PerformedScheduledExecution(
+ scenarioId = ScenarioId(1),
+ scenarioActivityId = firstActivity.scenarioActivityId,
+ user = ScenarioUser(None, UserName("Nussknacker"), None, None),
+ date = firstActivity.date,
+ scenarioVersionId = Some(ScenarioVersionId(1)),
+ dateFinished = firstActivity.dateFinished,
+ scheduleName = "[default]",
+ scheduledExecutionStatus = ScheduledExecutionStatus.Failed,
+ createdAt = firstActivity.createdAt,
+ retriesLeft = None,
+ nextRetryAt = None
+ ),
+ )
}
private def randomProcessActionId = ProcessActionId(UUID.randomUUID())
diff --git a/engine/flink/schemed-kafka-components-utils/src/main/java/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeSerializerSnapshot.java b/engine/flink/schemed-kafka-components-utils/src/main/java/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeSerializerSnapshot.java
new file mode 100644
index 00000000000..964a2d55dd5
--- /dev/null
+++ b/engine/flink/schemed-kafka-components-utils/src/main/java/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeSerializerSnapshot.java
@@ -0,0 +1,44 @@
+package pl.touk.nussknacker.engine.schemedkafka.flink.typeinfo;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * A {@link TypeSerializerSnapshot} for the Scala {@link ConsumerRecordTypeInfo}.
+ */
+public final class ConsumerRecordTypeSerializerSnapshot
+ extends CompositeTypeSerializerSnapshot, ConsumerRecordSerializer> {
+
+ final private static int VERSION = 1;
+
+ public ConsumerRecordTypeSerializerSnapshot() {
+ super();
+ }
+
+ public ConsumerRecordTypeSerializerSnapshot(ConsumerRecordSerializer serializerInstance) {
+ super(serializerInstance);
+ }
+
+ @Override
+ protected int getCurrentOuterSnapshotVersion() {
+ return VERSION;
+ }
+
+ @Override
+ protected TypeSerializer>[] getNestedSerializers(ConsumerRecordSerializer outerSerializer) {
+ return new TypeSerializer[] { outerSerializer.keySerializer(), outerSerializer.valueSerializer() };
+ }
+
+ @Override
+ protected ConsumerRecordSerializer createOuterSerializerWithNestedSerializers(TypeSerializer>[] nestedSerializers) {
+ @SuppressWarnings("unchecked")
+ TypeSerializer keySerializer = (TypeSerializer) nestedSerializers[0];
+
+ @SuppressWarnings("unchecked")
+ TypeSerializer valueSerializer = (TypeSerializer) nestedSerializers[1];
+
+ return new ConsumerRecordSerializer<>(keySerializer, valueSerializer);
+ }
+}
diff --git a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/FlinkUniversalSchemaBasedSerdeProvider.scala b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/FlinkUniversalSchemaBasedSerdeProvider.scala
new file mode 100644
index 00000000000..4f14299516e
--- /dev/null
+++ b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/FlinkUniversalSchemaBasedSerdeProvider.scala
@@ -0,0 +1,26 @@
+package pl.touk.nussknacker.engine.schemedkafka
+
+import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.KafkaSchemaRegistryBasedValueSerializationSchemaFactory
+import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaBasedSerdeProvider.createSchemaIdFromMessageExtractor
+import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.{UniversalKafkaDeserializerFactory, UniversalSchemaValidator, UniversalSerializerFactory, UniversalToJsonFormatterFactory}
+import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaBasedSerdeProvider, SchemaRegistryClientFactory}
+import pl.touk.nussknacker.engine.schemedkafka.source.flink.FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory
+
+object FlinkUniversalSchemaBasedSerdeProvider {
+
+ def create(schemaRegistryClientFactory: SchemaRegistryClientFactory): SchemaBasedSerdeProvider = {
+ SchemaBasedSerdeProvider(
+ new KafkaSchemaRegistryBasedValueSerializationSchemaFactory(
+ schemaRegistryClientFactory,
+ UniversalSerializerFactory
+ ),
+ new FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory(
+ schemaRegistryClientFactory,
+ new UniversalKafkaDeserializerFactory(createSchemaIdFromMessageExtractor)
+ ),
+ new UniversalToJsonFormatterFactory(schemaRegistryClientFactory, createSchemaIdFromMessageExtractor),
+ UniversalSchemaValidator
+ )
+ }
+
+}
diff --git a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeInfo.scala b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeInfo.scala
new file mode 100644
index 00000000000..38643da0fe5
--- /dev/null
+++ b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeInfo.scala
@@ -0,0 +1,204 @@
+package pl.touk.nussknacker.engine.schemedkafka.flink.typeinfo
+
+import com.github.ghik.silencer.silent
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
+import org.apache.kafka.common.record.TimestampType
+
+import java.util.{Objects, Optional}
+
+class ConsumerRecordTypeInfo[K, V](val keyTypeInfo: TypeInformation[K], val valueTypeInfo: TypeInformation[V])
+ extends TypeInformation[ConsumerRecord[K, V]] {
+
+ override def getTypeClass: Class[ConsumerRecord[K, V]] = classOf[ConsumerRecord[K, V]]
+
+ @silent("deprecated")
+ override def createSerializer(
+ config: org.apache.flink.api.common.ExecutionConfig
+ ): TypeSerializer[ConsumerRecord[K, V]] = {
+ new ConsumerRecordSerializer[K, V](keyTypeInfo.createSerializer(config), valueTypeInfo.createSerializer(config))
+ }
+
+ // ConsumerRecord 8 simple fields
+ override def getArity: Int = 8
+
+ // TODO: find out what's the correct value here
+ // ConsumerRecord 8 fields (w/o: headers, key, value) + Headers 2 fields + key.fields + value.fields
+ override def getTotalFields: Int = 8 + 2 + keyTypeInfo.getTotalFields + valueTypeInfo.getTotalFields
+
+ override def isKeyType: Boolean = false
+
+ override def isBasicType: Boolean = false
+
+ override def isTupleType: Boolean = false
+
+ override def toString: String =
+ s"ConsumerRecordTypeInfo($keyTypeInfo, $valueTypeInfo)"
+
+ override def canEqual(obj: Any): Boolean =
+ obj.isInstanceOf[ConsumerRecordTypeInfo[_, _]]
+
+ override def equals(obj: Any): Boolean =
+ obj match {
+ case info: ConsumerRecordTypeInfo[_, _] =>
+ keyTypeInfo.equals(info.keyTypeInfo) && valueTypeInfo.equals(info.valueTypeInfo)
+ case _ => false
+ }
+
+ override def hashCode(): Int =
+ Objects.hashCode(keyTypeInfo, valueTypeInfo)
+}
+
+class ConsumerRecordSerializer[K, V](val keySerializer: TypeSerializer[K], val valueSerializer: TypeSerializer[V])
+ extends TypeSerializer[ConsumerRecord[K, V]] {
+
+ override def getLength: Int = -1
+
+ override def isImmutableType: Boolean = true
+
+ override def createInstance(): ConsumerRecord[K, V] =
+ new ConsumerRecord[K, V](null, 0, 0, null.asInstanceOf[K], null.asInstanceOf[V])
+
+ override def duplicate(): TypeSerializer[ConsumerRecord[K, V]] = {
+ val keyDuplicated = keySerializer.duplicate()
+ val valueDuplicated = valueSerializer.duplicate()
+
+ if (keyDuplicated.equals(keySerializer) && valueDuplicated.equals(valueSerializer)) {
+ this
+ } else {
+ new ConsumerRecordSerializer(keyDuplicated, valueDuplicated)
+ }
+ }
+
+ override def copy(record: ConsumerRecord[K, V]): ConsumerRecord[K, V] =
+ new ConsumerRecord[K, V](
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.timestamp(),
+ record.timestampType(),
+ ConsumerRecord.NULL_SIZE,
+ ConsumerRecord.NULL_SIZE,
+ record.key(),
+ record.value(),
+ record.headers(),
+ record.leaderEpoch()
+ )
+
+ override def copy(record: ConsumerRecord[K, V], reuse: ConsumerRecord[K, V]): ConsumerRecord[K, V] =
+ copy(record)
+
+ override def copy(source: DataInputView, target: DataOutputView): Unit =
+ serialize(deserialize(source), target)
+
+ override def serialize(record: ConsumerRecord[K, V], target: DataOutputView): Unit = {
+ target.writeUTF(record.topic())
+ target.writeInt(record.partition())
+ target.writeLong(record.offset())
+ target.writeLong(record.timestamp())
+
+ // Short takes less space than int
+ target.writeShort(record.timestampType().id)
+
+ target.writeInt(record.serializedKeySize())
+ target.writeInt(record.serializedValueSize())
+
+ // Serialize the key (can be null)
+ if (record.key() == null) {
+ target.writeBoolean(false)
+ } else {
+ target.writeBoolean(true)
+ keySerializer.serialize(record.key(), target)
+ }
+
+ // Serialize the value (can be null)
+ if (record.value() == null) {
+ target.writeBoolean(false)
+ } else {
+ target.writeBoolean(true)
+ valueSerializer.serialize(record.value(), target)
+ }
+
+ if (record.leaderEpoch().isPresent) {
+ target.writeBoolean(true)
+ target.writeInt(record.leaderEpoch.get())
+ } else {
+ target.writeBoolean(false)
+ }
+
+ target.writeInt(record.headers().toArray.length)
+ record.headers().forEach { header =>
+ target.writeUTF(header.key())
+ target.writeInt(header.value().length)
+ target.write(header.value())
+ }
+ }
+
+ override def deserialize(reuse: ConsumerRecord[K, V], source: DataInputView): ConsumerRecord[K, V] =
+ deserialize(source)
+
+ override def deserialize(source: DataInputView): ConsumerRecord[K, V] = {
+ val topic = source.readUTF()
+ val partition = source.readInt()
+ val offset = source.readLong()
+ val timestamp = source.readLong()
+ val timestampTypeId = source.readShort().toInt
+ val serializedKeySize = source.readInt()
+ val serializedValueSize = source.readInt()
+
+ val key = if (source.readBoolean()) keySerializer.deserialize(source) else null.asInstanceOf[K]
+ val value = if (source.readBoolean()) valueSerializer.deserialize(source) else null.asInstanceOf[V]
+ val leaderEpoch = if (source.readBoolean()) Optional.of[Integer](source.readInt()) else Optional.empty[Integer]
+
+ val headers = (0 until source.readInt()).foldLeft(new RecordHeaders) { (headers, _) =>
+ val name = source.readUTF()
+ val len = source.readInt()
+
+ val value = new Array[Byte](len)
+ source.read(value)
+
+ val header = new RecordHeader(name, value)
+ headers.add(header)
+ headers
+ }
+
+ val timestampType =
+ TimestampType
+ .values()
+ .toList
+ .find(_.id == timestampTypeId)
+ .getOrElse(throw new IllegalArgumentException(s"Unknown TimestampType id: $timestampTypeId."))
+
+ new ConsumerRecord[K, V](
+ topic,
+ partition,
+ offset,
+ timestamp,
+ timestampType,
+ serializedKeySize,
+ serializedValueSize,
+ key,
+ value,
+ headers,
+ leaderEpoch
+ )
+ }
+
+ override def snapshotConfiguration(): TypeSerializerSnapshot[ConsumerRecord[K, V]] =
+ new ConsumerRecordTypeSerializerSnapshot()
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case other: ConsumerRecordSerializer[_, _] =>
+ keySerializer.equals(other.keySerializer) && valueSerializer.equals(other.valueSerializer)
+ case _ => false
+ }
+ }
+
+ override def hashCode(): Int =
+ Objects.hashCode(keySerializer, valueSerializer)
+
+}
diff --git a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala
index a7563c0ac54..4ef5e35d98a 100644
--- a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala
+++ b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala
@@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.schemedkafka.sink.flink
import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.ParsedSchema
import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.configuration.Configuration
import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
@@ -13,6 +14,7 @@ import pl.touk.nussknacker.engine.api.validation.ValidationMode
import pl.touk.nussknacker.engine.api.{Context, LazyParameter, ValueWithContext}
import pl.touk.nussknacker.engine.flink.api.exception.{ExceptionHandler, WithExceptionHandler}
import pl.touk.nussknacker.engine.flink.api.process.{FlinkCustomNodeContext, FlinkSink}
+import pl.touk.nussknacker.engine.flink.typeinformation.KeyedValueType
import pl.touk.nussknacker.engine.flink.util.keyed
import pl.touk.nussknacker.engine.flink.util.keyed.KeyedValueMapper
import pl.touk.nussknacker.engine.kafka.serialization.KafkaSerializationSchema
@@ -40,12 +42,21 @@ class FlinkKafkaUniversalSink(
override def registerSink(
dataStream: DataStream[ValueWithContext[Value]],
flinkNodeContext: FlinkCustomNodeContext
- ): DataStreamSink[_] =
- // FIXME: Missing map TypeInformation
+ ): DataStreamSink[_] = {
+
+ // TODO: Creating TypeInformation for Avro / Json Schema is difficult because of schema evolution, therefore we rely on Kryo, e.g. serializer for GenericRecordWithSchemaId
+ val typeInfo = KeyedValueType
+ .info(
+ Types.STRING, // KafkaSink for key supports only String
+ Types.GENERIC(classOf[AnyRef])
+ )
+ .asInstanceOf[TypeInformation[KeyedValue[AnyRef, AnyRef]]]
+
dataStream
- .map(new EncodeAvroRecordFunction(flinkNodeContext))
+ .map(new EncodeAvroRecordFunction(flinkNodeContext), typeInfo)
.filter(_.value != null)
.addSink(toFlinkFunction)
+ }
def prepareValue(
ds: DataStream[Context],
diff --git a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory.scala b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory.scala
new file mode 100644
index 00000000000..3045db38a58
--- /dev/null
+++ b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory.scala
@@ -0,0 +1,68 @@
+package pl.touk.nussknacker.engine.schemedkafka.source.flink
+
+import io.confluent.kafka.schemaregistry.ParsedSchema
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.serialization.Deserializer
+import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection
+import pl.touk.nussknacker.engine.kafka.KafkaConfig
+import pl.touk.nussknacker.engine.kafka.consumerrecord.ConsumerRecordKafkaDeserializationSchema
+import pl.touk.nussknacker.engine.kafka.serialization.KafkaDeserializationSchema
+import pl.touk.nussknacker.engine.schemedkafka.RuntimeSchemaData
+import pl.touk.nussknacker.engine.schemedkafka.flink.typeinfo.ConsumerRecordTypeInfo
+import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClientFactory
+import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.{
+ KafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory,
+ SchemaRegistryBasedDeserializerFactory
+}
+
+import scala.reflect.ClassTag
+
+class FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory(
+ schemaRegistryClientFactory: SchemaRegistryClientFactory,
+ deserializerFactory: SchemaRegistryBasedDeserializerFactory
+) extends KafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory(
+ schemaRegistryClientFactory,
+ deserializerFactory
+ ) {
+
+ override def create[K: ClassTag, V: ClassTag](
+ kafkaConfig: KafkaConfig,
+ keySchemaDataOpt: Option[RuntimeSchemaData[ParsedSchema]],
+ valueSchemaDataOpt: Option[RuntimeSchemaData[ParsedSchema]]
+ ): KafkaDeserializationSchema[ConsumerRecord[K, V]] = {
+
+ // We extend by ResultTypeQueryable because we want to use Flink TypeInformation
+ new ConsumerRecordKafkaDeserializationSchema[K, V] with ResultTypeQueryable[ConsumerRecord[K, V]] {
+
+ @transient
+ override protected lazy val keyDeserializer: Deserializer[K] =
+ createKeyOrUseStringDeserializer[K](keySchemaDataOpt, kafkaConfig)
+
+ @transient
+ override protected lazy val valueDeserializer: Deserializer[V] =
+ createValueDeserializer[V](valueSchemaDataOpt, kafkaConfig)
+
+ private lazy val typeInformationDetector = TypeInformationDetection.instance
+
+ private lazy val keyTypeInfo: TypeInformation[K] = {
+ if (kafkaConfig.useStringForKey) {
+ Types.STRING.asInstanceOf[TypeInformation[K]]
+ } else {
+ // TODO: Creating TypeInformation for Avro / Json Schema is difficult because of schema evolution, therefore we rely on Kryo, e.g. serializer for GenericRecordWithSchemaId
+ typeInformationDetector.forClass[K]
+ }
+ }
+
+ private lazy val valueTypeInfo: TypeInformation[V] =
+ // TODO: Creating TypeInformation for Avro / Json Schema is difficult because of schema evolution, therefore we rely on Kryo, e.g. serializer for GenericRecordWithSchemaId
+ typeInformationDetector.forClass[V]
+
+ override def getProducedType: TypeInformation[ConsumerRecord[K, V]] =
+ new ConsumerRecordTypeInfo(keyTypeInfo, valueTypeInfo)
+
+ }
+ }
+
+}
diff --git a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordSerializerSpec.scala b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordSerializerSpec.scala
new file mode 100644
index 00000000000..bc7335aeb0f
--- /dev/null
+++ b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordSerializerSpec.scala
@@ -0,0 +1,118 @@
+package pl.touk.nussknacker.engine.schemedkafka.flink.typeinfo
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.serialization.SerializerConfigImpl
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
+import org.apache.kafka.common.record.TimestampType
+import org.scalatest.Assertion
+import org.scalatest.funsuite.AnyFunSuite
+import org.scalatest.matchers.must.Matchers
+import org.scalatest.prop.TableDrivenPropertyChecks
+import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration
+import pl.touk.nussknacker.test.ProcessUtils.convertToAnyShouldWrapper
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.charset.StandardCharsets
+import java.time.LocalDate
+import java.util.Optional
+import scala.reflect.{ClassTag, classTag}
+import scala.util.Random
+
+class ConsumerRecordSerializerSpec extends AnyFunSuite with Matchers with TableDrivenPropertyChecks {
+
+ import pl.touk.nussknacker.test.RandomImplicits._
+
+ private val bufferSize = 1024
+
+ private val serializerConfig = {
+ val executionConfig = new ExecutionConfig()
+ val configuration = FlinkTestConfiguration.configuration()
+ new SerializerConfigImpl(configuration, executionConfig)
+ }
+
+ test("should serialize and deserialize simple record") {
+ val table = Table[ConsumerRecord[_, _]](
+ "record",
+ createConsumerRecord(Random.nextInt(), Random.nextInt()),
+ createConsumerRecord(Random.nextString(), Random.nextString()),
+ createConsumerRecord(Random.nextInt(), Random.nextString()),
+ createConsumerRecord(Random.nextString(), Random.nextInt()),
+ createConsumerRecord(Random.nextString(), Random.nextFloat()),
+ createConsumerRecord(Random.nextString(), Random.nextDouble()),
+ createConsumerRecord(Random.nextString(), Random.nextBoolean()),
+ createConsumerRecord(Random.nextString(), LocalDate.now()),
+ )
+
+ forAll(table) { record =>
+ val results = serializeAndDeserialize(record)
+ compare(results, record)
+ }
+
+ }
+
+ // ConsumerRecord doesn't implement hashCode & equals methods
+ private def compare(result: ConsumerRecord[_, _], expected: ConsumerRecord[_, _]): Assertion = {
+ result.topic() shouldBe expected.topic()
+ result.partition() shouldBe expected.partition()
+ result.offset() shouldBe expected.offset()
+ result.timestamp() shouldBe expected.timestamp()
+ result.timestampType() shouldBe expected.timestampType()
+ result.serializedKeySize() shouldBe expected.serializedKeySize()
+ result.serializedValueSize() shouldBe expected.serializedValueSize()
+ result.key() shouldBe expected.key()
+ result.value() shouldBe expected.value()
+ result.headers() shouldBe expected.headers()
+ result.leaderEpoch() shouldBe expected.leaderEpoch()
+ }
+
+ private def createConsumerRecord[K, V](key: K, value: V): ConsumerRecord[K, V] = {
+ val timestampTypes = TimestampType.values().toList
+ val timestampType = timestampTypes(Random.nextInt(timestampTypes.length))
+
+ val leaderEpoch =
+ if (System.currentTimeMillis() % 2 == 0) Optional.empty[Integer]()
+ else Optional.of(Random.nextInt().asInstanceOf[Integer])
+
+ val headers = (0 until Random.nextInt(25)).foldLeft(new RecordHeaders()) { (headers, _) =>
+ headers.add(new RecordHeader(Random.nextString(), Random.nextString().getBytes(StandardCharsets.UTF_8)))
+ headers
+ }
+
+ new ConsumerRecord[K, V](
+ Random.nextString(),
+ Random.nextInt(),
+ Random.nextLong(),
+ Random.nextLong(),
+ timestampType,
+ ConsumerRecord.NULL_SIZE,
+ ConsumerRecord.NULL_SIZE,
+ key,
+ value,
+ headers,
+ leaderEpoch
+ )
+ }
+
+ private def serializeAndDeserialize[K: ClassTag, V: ClassTag](in: ConsumerRecord[K, V]): ConsumerRecord[K, V] = {
+ val keySerializer = getSerializer[K]
+ val valueSerializer = getSerializer[V]
+
+ val serializer = new ConsumerRecordSerializer(keySerializer, valueSerializer)
+
+ val outStream = new ByteArrayOutputStream(bufferSize)
+ val outWrapper = new DataOutputViewStreamWrapper(outStream)
+
+ serializer.serialize(in, outWrapper)
+ serializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(outStream.toByteArray)))
+ }
+
+ private def getSerializer[T: ClassTag]: TypeSerializer[T] =
+ TypeExtractor
+ .getForClass(classTag[T].runtimeClass.asInstanceOf[Class[T]])
+ .createSerializer(serializerConfig)
+
+}
diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ScenarioActivity.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ScenarioActivity.scala
index 90c68f2503c..a26b728a576 100644
--- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ScenarioActivity.scala
+++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ScenarioActivity.scala
@@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.api.deployment
import enumeratum.EnumEntry.UpperSnakecase
import enumeratum.{Enum, EnumEntry}
+import pl.touk.nussknacker.engine.api.Comment
import pl.touk.nussknacker.engine.api.component.ProcessingMode
import pl.touk.nussknacker.engine.api.process.VersionId
@@ -45,7 +46,7 @@ sealed trait ScenarioComment {
object ScenarioComment {
final case class WithContent(
- comment: String,
+ comment: Comment,
lastModifiedByUserName: UserName,
lastModifiedAt: Instant,
) extends ScenarioComment
@@ -55,6 +56,33 @@ object ScenarioComment {
lastModifiedAt: Instant,
) extends ScenarioComment
+ def from(
+ content: String,
+ lastModifiedByUserName: UserName,
+ lastModifiedAt: Instant,
+ ): ScenarioComment =
+ from(Comment.from(content), lastModifiedByUserName, lastModifiedAt)
+
+ def from(
+ content: Option[Comment],
+ lastModifiedByUserName: UserName,
+ lastModifiedAt: Instant,
+ ): ScenarioComment = {
+ content match {
+ case Some(content) =>
+ WithContent(
+ comment = content,
+ lastModifiedByUserName = lastModifiedByUserName,
+ lastModifiedAt = lastModifiedAt,
+ )
+ case None =>
+ WithoutContent(
+ lastModifiedByUserName = lastModifiedByUserName,
+ lastModifiedAt = lastModifiedAt,
+ )
+ }
+ }
+
}
sealed trait ScenarioAttachment
diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/test/ModelDataTestInfoProvider.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/test/ModelDataTestInfoProvider.scala
index 9a1ef401097..16c7a45babd 100644
--- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/test/ModelDataTestInfoProvider.scala
+++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/test/ModelDataTestInfoProvider.scala
@@ -1,17 +1,14 @@
package pl.touk.nussknacker.engine.definition.test
-import cats.data.NonEmptyList
+import cats.data.Validated.{Invalid, Valid}
+import cats.data.{NonEmptyList, ValidatedNel}
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine.ModelData
+import pl.touk.nussknacker.engine.api.context.ProcessCompilationError
import pl.touk.nussknacker.engine.api.definition.Parameter
-import pl.touk.nussknacker.engine.api.process.{
- ComponentUseCase,
- SourceTestSupport,
- TestDataGenerator,
- TestWithParametersSupport
-}
+import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord}
-import pl.touk.nussknacker.engine.api.{JobData, MetaData, NodeId, ProcessVersion, process}
+import pl.touk.nussknacker.engine.api.{JobData, NodeId, ProcessVersion}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.compile.ExpressionCompiler
import pl.touk.nussknacker.engine.compile.nodecompilation.{LazyParameterCreationStrategy, NodeCompiler}
@@ -82,13 +79,23 @@ class ModelDataTestInfoProvider(modelData: ModelData) extends TestInfoProvider w
}
}
+ // Currently we rely on the assumption that client always call scenarioTesting / {scenarioName} / parameters endpoint
+ // only when scenarioTesting / {scenarioName} / capabilities endpoint returns canTestWithForm = true. Because of that
+ // for non happy-path cases we throw UnsupportedOperationException
+ // TODO: This assumption is wrong. Every endpoint should be treated separately. Currently from time to time
+ // users got error notification because this endpoint is called without checking canTestWithForm = true.
+ // We can go even further and merge both endpoints
private def getTestParameters(source: SourceNodeData, jobData: JobData): List[Parameter] =
modelData.withThisAsContextClassLoader {
prepareSourceObj(source)(jobData, NodeId(source.id)) match {
- case Some(s: TestWithParametersSupport[_]) => s.testParametersDefinition
- case _ =>
+ case Valid(s: TestWithParametersSupport[_]) => s.testParametersDefinition
+ case Valid(sourceWithoutTestWithParametersSupport) =>
+ throw new UnsupportedOperationException(
+ s"Requested test parameters from source [${source.id}] of [${sourceWithoutTestWithParametersSupport.getClass.getName}] class that does not implement TestWithParametersSupport."
+ )
+ case Invalid(errors) =>
throw new UnsupportedOperationException(
- s"Requested test parameters from source (${source.id}) that does not implement TestWithParametersSupport."
+ s"Requested test parameters from source [${source.id}] that is not valid. Errors: ${errors.toList.mkString(", ")}"
)
}
}
@@ -118,19 +125,19 @@ class ModelDataTestInfoProvider(modelData: ModelData) extends TestInfoProvider w
val jobData = JobData(scenario.metaData, processVersion)
val generatorsForSourcesSupportingTestDataGeneration = for {
source <- collectAllSources(scenario)
- sourceObj <- prepareSourceObj(source)(jobData, NodeId(source.id))
+ sourceObj <- prepareSourceObj(source)(jobData, NodeId(source.id)).toList
testDataGenerator <- sourceObj.cast[TestDataGenerator]
} yield (NodeId(source.id), testDataGenerator)
NonEmptyList
.fromList(generatorsForSourcesSupportingTestDataGeneration)
.map(Right(_))
- .getOrElse(Left("Scenario doesn't have any source supporting test data generation"))
+ .getOrElse(Left("Scenario doesn't have any valid source supporting test data generation"))
}
private def prepareSourceObj(
source: SourceNodeData
- )(implicit jobData: JobData, nodeId: NodeId): Option[process.Source] = {
- nodeCompiler.compileSource(source).compiledObject.toOption
+ )(implicit jobData: JobData, nodeId: NodeId): ValidatedNel[ProcessCompilationError, Source] = {
+ nodeCompiler.compileSource(source).compiledObject
}
private def generateTestData(generators: NonEmptyList[(NodeId, TestDataGenerator)], size: Int) = {
diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/extension/ToMapConversionExt.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/extension/ToMapConversionExt.scala
index 632c206357b..5a97be28fdd 100644
--- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/extension/ToMapConversionExt.scala
+++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/extension/ToMapConversionExt.scala
@@ -7,6 +7,7 @@ import pl.touk.nussknacker.engine.api.typed.typing._
import pl.touk.nussknacker.engine.definition.clazz.{FunctionalMethodDefinition, MethodDefinition}
import pl.touk.nussknacker.engine.extension.CastOrConversionExt.{canBeMethodName, orNullSuffix, toMethodName}
import pl.touk.nussknacker.engine.spel.internal.ConversionHandler
+import pl.touk.nussknacker.engine.util.classes.Extensions.ClassExtensions
import java.lang.{Boolean => JBoolean}
import java.util.{Collection => JCollection, HashMap => JHashMap, Map => JMap, Set => JSet}
@@ -47,6 +48,8 @@ object ToMapConversionExt extends ConversionExt(ToMapConversion) {
object ToMapConversion extends ToCollectionConversion[JMap[_, _]] {
+ private val mapClass = classOf[JMap[_, _]]
+
private val keyName = "key"
private val valueName = "value"
private val keyAndValueNames = JSet.of(keyName, valueName)
@@ -62,8 +65,11 @@ object ToMapConversion extends ToCollectionConversion[JMap[_, _]] {
Typed.genericTypeClass[JMap[_, _]](params).validNel
case TypedClass(_, List(TypedObjectTypingResult(_, _, _))) =>
GenericFunctionTypingError.OtherError("List element must contain 'key' and 'value' fields").invalidNel
- case Unknown => Typed.genericTypeClass[JMap[_, _]](List(Unknown, Unknown)).validNel
- case _ => GenericFunctionTypingError.ArgumentTypeError.invalidNel
+ case TypedClass(_, List(TypedClass(klass, _))) if klass.isAOrChildOf(mapClass) =>
+ Typed.genericTypeClass[JMap[_, _]](List(Unknown, Unknown)).validNel
+ case TypedClass(_, List(Unknown)) => Typed.genericTypeClass[JMap[_, _]](List(Unknown, Unknown)).validNel
+ case Unknown => Typed.genericTypeClass[JMap[_, _]](List(Unknown, Unknown)).validNel
+ case _ => GenericFunctionTypingError.ArgumentTypeError.invalidNel
}
@tailrec
@@ -89,8 +95,8 @@ object ToMapConversion extends ToCollectionConversion[JMap[_, _]] {
private def canConvertToMap(c: JCollection[_]): Boolean = c.isEmpty || c
.stream()
.allMatch {
- case m: JMap[_, _] if !m.isEmpty => m.keySet().containsAll(keyAndValueNames)
- case _ => false
+ case m: JMap[_, _] => m.keySet().containsAll(keyAndValueNames)
+ case _ => false
}
}
diff --git a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala
index b446d5334d2..6e7331fda8f 100644
--- a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala
+++ b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala
@@ -148,6 +148,8 @@ class SpelExpressionSpec extends AnyFunSuite with Matchers with ValidatedValuesD
case class ContainerOfUnknown(value: Any)
+ case class ContainerOfGenericMap(value: JMap[_, _])
+
import pl.touk.nussknacker.engine.util.Implicits._
private def parse[T: TypeTag](
@@ -1609,10 +1611,12 @@ class SpelExpressionSpec extends AnyFunSuite with Matchers with ValidatedValuesD
result
}
val mapWithDifferentValueTypes = Map("foo" -> "bar", "baz" -> 1).asJava
+ val mapWithKeyAndValueFields = Map("key" -> "foo", "value" -> 123).asJava
val customCtx = ctx
.withVariable("stringMap", stringMap)
.withVariable("mapWithDifferentValueTypes", mapWithDifferentValueTypes)
.withVariable("nullableMap", nullableMap)
+ .withVariable("containerWithMapWithKeyAndValueFields", ContainerOfGenericMap(mapWithKeyAndValueFields))
forAll(
Table(
@@ -1631,6 +1635,16 @@ class SpelExpressionSpec extends AnyFunSuite with Matchers with ValidatedValuesD
"#nullableMap.![{key: #this.key, value: #this.value}].toMap",
mapStringStringType,
nullableMap
+ ),
+ (
+ "{}.toMap",
+ Typed.genericTypeClass[JMap[_, _]](List(Unknown, Unknown)),
+ Map.empty.asJava,
+ ),
+ (
+ "{#containerWithMapWithKeyAndValueFields.value}.toMap",
+ Typed.genericTypeClass[JMap[_, _]](List(Unknown, Unknown)),
+ Map("foo" -> 123).asJava,
)
)
) { (expression, expectedType, expectedResult) =>
@@ -1646,6 +1660,14 @@ class SpelExpressionSpec extends AnyFunSuite with Matchers with ValidatedValuesD
}
}
+ test("should type conversion of list of unknown to map correctly and return error in runtime") {
+ val parsed = parse[Any]("{1, 'foo', false}.toMap", ctx).validValue
+ parsed.returnType.withoutValue shouldBe Typed.genericTypeClass[JMap[_, _]](List(Unknown, Unknown))
+ an[SpelExpressionEvaluationException] shouldBe thrownBy {
+ parsed.expression.evaluateSync[Any](ctx)
+ }
+ }
+
test("should convert value to a given type") {
val map = Map("a" -> "b").asJava
val emptyMap = Map().asJava
diff --git a/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSchemaFactory.scala b/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSchemaFactory.scala
index cfc8b264854..d2cf4776338 100644
--- a/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSchemaFactory.scala
+++ b/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSchemaFactory.scala
@@ -1,11 +1,10 @@
package pl.touk.nussknacker.engine.kafka.consumerrecord
import cats.data.NonEmptyList
-import com.github.ghik.silencer.silent
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.Deserializer
import pl.touk.nussknacker.engine.api.process.TopicName
-import pl.touk.nussknacker.engine.kafka.serialization.KafkaDeserializationSchemaFactory
+import pl.touk.nussknacker.engine.kafka.serialization.{KafkaDeserializationSchema, KafkaDeserializationSchemaFactory}
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, serialization}
/**
@@ -28,36 +27,48 @@ abstract class ConsumerRecordDeserializationSchemaFactory[K, V]
topics: NonEmptyList[TopicName.ForSource],
kafkaConfig: KafkaConfig
): serialization.KafkaDeserializationSchema[ConsumerRecord[K, V]] = {
-
- new serialization.KafkaDeserializationSchema[ConsumerRecord[K, V]] {
+ new ConsumerRecordKafkaDeserializationSchema[K, V] {
@transient
- private lazy val keyDeserializer = createKeyDeserializer(kafkaConfig)
+ override protected lazy val keyDeserializer: Deserializer[K] =
+ createKeyDeserializer(kafkaConfig)
+
@transient
- private lazy val valueDeserializer = createValueDeserializer(kafkaConfig)
-
- @silent("deprecated") // using deprecated constructor for Flink 1.14/15 compatibility
- override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[K, V] = {
- val key = keyDeserializer.deserialize(record.topic(), record.key())
- val value = valueDeserializer.deserialize(record.topic(), record.value())
- new ConsumerRecord[K, V](
- record.topic(),
- record.partition(),
- record.offset(),
- record.timestamp(),
- record.timestampType(),
- ConsumerRecord.NULL_CHECKSUM.longValue(),
- record.serializedKeySize(),
- record.serializedValueSize(),
- key,
- value,
- record.headers(),
- record.leaderEpoch()
- )
- }
-
- override def isEndOfStream(nextElement: ConsumerRecord[K, V]): Boolean = false
+ override protected lazy val valueDeserializer: Deserializer[V] =
+ createValueDeserializer(kafkaConfig)
+
}
}
}
+
+trait ConsumerRecordKafkaDeserializationSchema[K, V] extends KafkaDeserializationSchema[ConsumerRecord[K, V]] {
+
+ @transient
+ protected val keyDeserializer: Deserializer[K]
+
+ @transient
+ protected val valueDeserializer: Deserializer[V]
+
+ override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[K, V] = {
+ val key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key())
+ val value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value())
+
+ new ConsumerRecord[K, V](
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.timestamp(),
+ record.timestampType(),
+ record.serializedKeySize(),
+ record.serializedValueSize(),
+ key,
+ value,
+ record.headers(),
+ record.leaderEpoch()
+ )
+ }
+
+ override def isEndOfStream(nextElement: ConsumerRecord[K, V]): Boolean = false
+
+}
diff --git a/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/SerializableConsumerRecord.scala b/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/SerializableConsumerRecord.scala
index 41e7dae7b0f..8a6d6f93081 100644
--- a/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/SerializableConsumerRecord.scala
+++ b/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/SerializableConsumerRecord.scala
@@ -1,6 +1,5 @@
package pl.touk.nussknacker.engine.kafka.consumerrecord
-import com.github.ghik.silencer.silent
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.record.TimestampType
import pl.touk.nussknacker.engine.api.process.TopicName
@@ -28,7 +27,6 @@ case class SerializableConsumerRecord[K, V](
/**
* Converts SerializableConsumerRecord to ConsumerRecord, uses default values in case of missing attributes.
*/
- @silent("deprecated") // using deprecated constructor for Flink 1.14/15 compatibility
def toKafkaConsumerRecord(
formatterTopic: TopicName.ForSource,
serializeKeyValue: (Option[K], V) => (Array[Byte], Array[Byte])
@@ -43,7 +41,6 @@ case class SerializableConsumerRecord[K, V](
offset.getOrElse(0L),
timestamp.getOrElse(ConsumerRecord.NO_TIMESTAMP),
timestampType.map(TimestampType.forName).getOrElse(TimestampType.NO_TIMESTAMP_TYPE),
- ConsumerRecord.NULL_CHECKSUM.longValue(),
ConsumerRecord.NULL_SIZE,
ConsumerRecord.NULL_SIZE,
keyBytes,
diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaBasedSerdeProvider.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaBasedSerdeProvider.scala
index 36e5a2b1720..acb539b703c 100644
--- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaBasedSerdeProvider.scala
+++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaBasedSerdeProvider.scala
@@ -35,7 +35,7 @@ object UniversalSchemaBasedSerdeProvider {
)
}
- private def createSchemaIdFromMessageExtractor(
+ def createSchemaIdFromMessageExtractor(
schemaRegistryClient: SchemaRegistryClient
): ChainedSchemaIdFromMessageExtractor = {
val isConfluent = schemaRegistryClient.isInstanceOf[ConfluentSchemaRegistryClient]
diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/serialization/KafkaSchemaBasedKeyValueDeserializationSchemaFactory.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/serialization/KafkaSchemaBasedKeyValueDeserializationSchemaFactory.scala
index f5490f82b1b..fc6ee285886 100644
--- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/serialization/KafkaSchemaBasedKeyValueDeserializationSchemaFactory.scala
+++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/serialization/KafkaSchemaBasedKeyValueDeserializationSchemaFactory.scala
@@ -1,11 +1,11 @@
package pl.touk.nussknacker.engine.schemedkafka.serialization
-import com.github.ghik.silencer.silent
import io.confluent.kafka.schemaregistry.ParsedSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer}
import pl.touk.nussknacker.engine.schemedkafka.RuntimeSchemaData
import pl.touk.nussknacker.engine.kafka.KafkaConfig
+import pl.touk.nussknacker.engine.kafka.consumerrecord.ConsumerRecordKafkaDeserializationSchema
import pl.touk.nussknacker.engine.kafka.serialization.KafkaDeserializationSchema
import scala.reflect.ClassTag
@@ -18,6 +18,17 @@ import scala.reflect.ClassTag
abstract class KafkaSchemaBasedKeyValueDeserializationSchemaFactory
extends KafkaSchemaBasedDeserializationSchemaFactory {
+ protected def createKeyOrUseStringDeserializer[K: ClassTag](
+ schemaDataOpt: Option[RuntimeSchemaData[ParsedSchema]],
+ kafkaConfig: KafkaConfig
+ ): Deserializer[K] = {
+ if (kafkaConfig.useStringForKey) {
+ createStringKeyDeserializer.asInstanceOf[Deserializer[K]]
+ } else {
+ createKeyDeserializer[K](schemaDataOpt, kafkaConfig)
+ }
+ }
+
protected def createKeyDeserializer[K: ClassTag](
schemaDataOpt: Option[RuntimeSchemaData[ParsedSchema]],
kafkaConfig: KafkaConfig
@@ -36,40 +47,18 @@ abstract class KafkaSchemaBasedKeyValueDeserializationSchemaFactory
valueSchemaDataOpt: Option[RuntimeSchemaData[ParsedSchema]]
): KafkaDeserializationSchema[ConsumerRecord[K, V]] = {
- new KafkaDeserializationSchema[ConsumerRecord[K, V]] {
+ new ConsumerRecordKafkaDeserializationSchema[K, V] {
@transient
- private lazy val keyDeserializer = if (kafkaConfig.useStringForKey) {
- createStringKeyDeserializer.asInstanceOf[Deserializer[K]]
- } else {
- createKeyDeserializer[K](keySchemaDataOpt, kafkaConfig)
- }
- @transient
- private lazy val valueDeserializer = createValueDeserializer[V](valueSchemaDataOpt, kafkaConfig)
-
- @silent("deprecated") // using deprecated constructor for Flink 1.14/15 compatibility
- override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[K, V] = {
- val key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key())
- val value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value())
- new ConsumerRecord[K, V](
- record.topic(),
- record.partition(),
- record.offset(),
- record.timestamp(),
- record.timestampType(),
- ConsumerRecord.NULL_CHECKSUM.longValue(),
- record.serializedKeySize(),
- record.serializedValueSize(),
- key,
- value,
- record.headers(),
- record.leaderEpoch()
- )
- }
+ override protected lazy val keyDeserializer: Deserializer[K] =
+ createKeyOrUseStringDeserializer[K](keySchemaDataOpt, kafkaConfig)
- override def isEndOfStream(nextElement: ConsumerRecord[K, V]): Boolean = false
+ @transient
+ override protected lazy val valueDeserializer: Deserializer[V] =
+ createValueDeserializer[V](valueSchemaDataOpt, kafkaConfig)
}
+
}
}
diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala
index a672d844e35..2c584efc02e 100644
--- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala
+++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala
@@ -238,6 +238,7 @@ class UniversalKafkaSinkFactory(
Option(finalState.schema),
kafkaConfig
)
+
val clientId = s"${TypedNodeDependency[MetaData].extract(dependencies).name}-${preparedTopic.prepared}"
val validationMode = if (params.extractUnsafe[Boolean](sinkRawEditorParamName)) {
validationModeParamDeclaration.extractValue(params) match {
diff --git a/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/RandomImplicits.scala b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/RandomImplicits.scala
new file mode 100644
index 00000000000..5a6825da1cb
--- /dev/null
+++ b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/RandomImplicits.scala
@@ -0,0 +1,25 @@
+package pl.touk.nussknacker.test
+
+import scala.util.Random
+
+object RandomImplicits {
+
+ implicit class RandomExt(rand: Random) {
+
+ private val AllowedStringLetters = ('a' to 'z') ++ ('A' to 'Z')
+
+ private val MinStringLength = 4
+
+ private val MaxStringLength = 32
+
+ def nextString(): String =
+ randomString(MinStringLength + rand.nextInt(MaxStringLength - MinStringLength))
+
+ def randomString(length: Int): String = {
+ require(length >= 0, "Length must be non-negative")
+ (0 until length).map(_ => AllowedStringLetters(rand.nextInt(AllowedStringLetters.length))).mkString
+ }
+
+ }
+
+}