Skip to content

Commit

Permalink
[NU-1772] ScenarioActivity-based activity log (#7039)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgoworko authored Oct 24, 2024
1 parent aece59e commit a4d84cc
Show file tree
Hide file tree
Showing 24 changed files with 791 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ class ScenarioActivityApiHttpService(

private def editComment(request: EditCommentRequest, scenarioId: ProcessId)(
implicit loggedUser: LoggedUser
): EitherT[Future, ScenarioActivityError, Unit] =
): EitherT[Future, ScenarioActivityError, ScenarioActivityId] =
EitherT(
dbioActionRunner.run(
scenarioActivityRepository.editComment(
Expand All @@ -542,14 +542,14 @@ class ScenarioActivityApiHttpService(

private def deleteComment(request: DeprecatedDeleteCommentRequest, scenarioId: ProcessId)(
implicit loggedUser: LoggedUser
): EitherT[Future, ScenarioActivityError, Unit] =
): EitherT[Future, ScenarioActivityError, ScenarioActivityId] =
EitherT(
dbioActionRunner.run(scenarioActivityRepository.deleteComment(scenarioId, request.commentId))
).leftMap(_ => NoComment(request.commentId))

private def deleteComment(request: DeleteCommentRequest, scenarioId: ProcessId)(
implicit loggedUser: LoggedUser
): EitherT[Future, ScenarioActivityError, Unit] =
): EitherT[Future, ScenarioActivityError, ScenarioActivityId] =
EitherT(
dbioActionRunner.run(
scenarioActivityRepository.deleteComment(scenarioId, ScenarioActivityId(request.scenarioActivityId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,13 @@ class DBProcessService(
DBIOAction.seq(
processRepository.archive(processId = process.idWithNameUnsafe, isArchived = false),
scenarioActionRepository
.markProcessAsUnArchived(processId = process.processIdUnsafe, process.processVersionId)
.addInstantAction(
process.processIdUnsafe,
process.processVersionId,
ScenarioActionName.UnArchive,
None,
None
)
)
)
} else {
Expand Down Expand Up @@ -527,7 +533,13 @@ class DBProcessService(
.runInTransaction(
DBIOAction.seq(
processRepository.archive(processId = process.idWithNameUnsafe, isArchived = true),
scenarioActionRepository.markProcessAsArchived(processId = process.processIdUnsafe, process.processVersionId)
scenarioActionRepository.addInstantAction(
process.processIdUnsafe,
process.processVersionId,
ScenarioActionName.Archive,
None,
None
)
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package pl.touk.nussknacker.ui.process

import cats.effect.IO
import com.typesafe.scalalogging.Logger
import org.slf4j.{LoggerFactory, MDC}
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.process.{ProcessId, VersionId}
import pl.touk.nussknacker.ui.process.ScenarioAttachmentService.AttachmentToAdd
import pl.touk.nussknacker.ui.security.api.LoggedUser

object ScenarioActivityAuditLog {

private val logger = Logger(LoggerFactory.getLogger(s"scenario-activity-audit"))

def onCreateScenarioActivity(
scenarioActivity: ScenarioActivity
): IO[Unit] =
logWithContext(scenarioActivity.scenarioId, scenarioActivity.scenarioVersionId, scenarioActivity.user.name.value)(
s"New activity: ${stringify(scenarioActivity)}"
)

def onEditComment(
processId: ProcessId,
user: LoggedUser,
scenarioActivityId: ScenarioActivityId,
comment: String
): IO[Unit] = {
logWithContext(ScenarioId(processId.value), None, user.username)(
s"[commentId=${scenarioActivityId.value.toString}] Comment edited, new value: [$comment]"
)
}

def onDeleteComment(
processId: ProcessId,
rowId: Long,
user: LoggedUser,
): IO[Unit] =
logWithContext(ScenarioId(processId.value), None, user.username)(
s"Comment with rowId=$rowId deleted"
)

def onDeleteComment(
processId: ProcessId,
activityId: ScenarioActivityId,
user: LoggedUser,
): IO[Unit] =
logWithContext(ScenarioId(processId.value), None, user.username)(
s"Comment for activityId=${activityId.value} deleted"
)

def onAddAttachment(
attachmentToAdd: AttachmentToAdd,
user: LoggedUser,
): IO[Unit] =
logWithContext(
ScenarioId(attachmentToAdd.scenarioId.value),
Some(ScenarioVersionId.from(attachmentToAdd.scenarioVersionId)),
user.username
)(s"Attachment added: [${attachmentToAdd.fileName}]")

def onDeleteAttachment(
scenarioId: ProcessId,
attachmentId: Long,
user: LoggedUser,
): IO[Unit] =
logWithContext(
ScenarioId(scenarioId.value),
None,
user.username
)(s"Attachment deleted: [attachmentId=$attachmentId]")

def onScenarioImmediateAction(
processActionId: ProcessActionId,
processId: ProcessId,
actionName: ScenarioActionName,
processVersion: Option[VersionId],
user: LoggedUser
): IO[Unit] =
logWithContext(ScenarioId(processId.value), processVersion.map(ScenarioVersionId.from), user.username)(
s"Immediate scenario action [actionName=${actionName.value},actionId=${processActionId.value}]"
)

def onScenarioActionStarted(
processActionId: ProcessActionId,
processId: ProcessId,
actionName: ScenarioActionName,
processVersion: Option[VersionId],
user: LoggedUser
): IO[Unit] =
logWithContext(ScenarioId(processId.value), processVersion.map(ScenarioVersionId.from), user.username)(
s"Scenario action [actionName=${actionName.value},actionId=${processActionId.value}] started"
)

def onScenarioActionFinishedWithSuccess(
processActionId: ProcessActionId,
processId: ProcessId,
actionName: ScenarioActionName,
processVersion: Option[VersionId],
comment: Option[String],
user: LoggedUser
): IO[Unit] = {
val commentValue = comment match {
case Some(content) => s"comment [$content]"
case None => "without comment"
}
logWithContext(ScenarioId(processId.value), processVersion.map(ScenarioVersionId.from), user.username)(
s"Scenario action [actionName=${actionName.value},actionId=${processActionId.value}] finished with success and $commentValue "
)
}

def onScenarioActionFinishedWithFailure(
processActionId: ProcessActionId,
processId: ProcessId,
actionName: ScenarioActionName,
processVersion: Option[VersionId],
comment: Option[String],
failureMessage: String,
user: LoggedUser
): IO[Unit] = {
val commentValue = comment match {
case Some(content) => s"with comment [$content]"
case None => "without comment"
}
logWithContext(ScenarioId(processId.value), processVersion.map(ScenarioVersionId.from), user.username)(
s"Scenario action [actionName=${actionName.value},actionId=${processActionId.value}] finished with failure [$failureMessage] $commentValue"
)
}

def onScenarioActionRemoved(
processActionId: ProcessActionId,
processId: ProcessId,
processVersion: Option[VersionId],
user: LoggedUser
): IO[Unit] = {
logWithContext(ScenarioId(processId.value), processVersion.map(ScenarioVersionId.from), user.username)(
s"Scenario action [actionId=${processActionId.value}] removed"
)
}

private def stringify(scenarioActivity: ScenarioActivity): String = scenarioActivity match {
case ScenarioActivity.ScenarioDeployed(_, _, _, _, _, comment, result) =>
s"ScenarioDeployed(comment=${stringify(comment)},result=${stringify(result)})"
case ScenarioActivity.ScenarioPaused(_, _, _, _, _, comment, result) =>
s"ScenarioPaused(comment=${stringify(comment)},result=${stringify(result)})"
case ScenarioActivity.ScenarioCanceled(_, _, _, _, _, comment, result) =>
s"ScenarioCanceled(comment=${stringify(comment)},result=${stringify(result)})"
case ScenarioActivity.CustomAction(_, _, _, _, _, actionName, comment, result) =>
s"CustomAction(action=$actionName,comment=${stringify(comment)},result=${stringify(result)})"
case ScenarioActivity.PerformedSingleExecution(_, _, _, _, _, comment, result) =>
s"PerformedSingleExecution(comment=${stringify(comment)},result=${stringify(result)})"
case ScenarioActivity.PerformedScheduledExecution(_, _, _, _, _, status, _, scheduleName, _, _, _) =>
s"PerformedScheduledExecution(scheduleName=$scheduleName,scheduledExecutionStatus=${status.entryName})"
case ScenarioActivity.ScenarioCreated(_, _, _, _, _) =>
"ScenarioCreated"
case ScenarioActivity.ScenarioArchived(_, _, _, _, _) =>
"ScenarioArchived"
case ScenarioActivity.ScenarioUnarchived(_, _, _, _, _) =>
"ScenarioUnarchived"
case ScenarioActivity.ScenarioModified(_, _, _, _, _, _, comment) =>
s"ScenarioModified(comment=${stringify(comment)})"
case ScenarioActivity.ScenarioNameChanged(_, _, _, _, _, oldName, newName) =>
s"ScenarioNameChanged(oldName=$oldName,newName=$newName)"
case ScenarioActivity.CommentAdded(_, _, _, _, _, comment) =>
s"CommentAdded(comment=${stringify(comment)})"
case ScenarioActivity.AttachmentAdded(_, _, _, _, _, attachment) =>
s"AttachmentAdded(fileName=${stringify(attachment)})"
case ScenarioActivity.ChangedProcessingMode(_, _, _, _, _, from, to) =>
s"ChangedProcessingMode(from=$from,to=$to)"
case ScenarioActivity.IncomingMigration(_, _, _, _, _, sourceEnvironment, sourceUser, sourceVersionId, _) =>
s"IncomingMigration(sourceEnvironment=${sourceEnvironment.name},sourceUser=${sourceUser.value},sourceVersionId=${sourceVersionId
.map(_.value.toString)
.getOrElse("[none]")})"
case ScenarioActivity.OutgoingMigration(_, _, _, _, _, destinationEnvironment) =>
s"OutgoingMigration(destinationEnvironment=${destinationEnvironment.name})"
case ScenarioActivity.AutomaticUpdate(_, _, _, _, _, changes) =>
s"AutomaticUpdate(changes=$changes)"
}

private def stringify(attachment: ScenarioAttachment): String = attachment match {
case ScenarioAttachment.Available(_, attachmentFilename, _, _) => s"Available(${attachmentFilename.value})"
case ScenarioAttachment.Deleted(attachmentFilename, _, _) => s"Deleted(${attachmentFilename.value})"
}

private def stringify(comment: ScenarioComment): String = comment match {
case ScenarioComment.WithContent(comment, _, _) => comment
case ScenarioComment.WithoutContent(_, _) => "none"
}

private def stringify(result: DeploymentResult): String = result match {
case DeploymentResult.Success(_) => "Success"
case DeploymentResult.Failure(_, errorMessage) => s"Failure($errorMessage)"
}

private def logWithContext(
scenarioId: ScenarioId,
scenarioVersionId: Option[ScenarioVersionId],
username: String,
)(log: String): IO[Unit] = IO.delay {
MDC.clear()
MDC.put("scenarioId", scenarioId.value.toString)
MDC.put("scenarioVersionId", scenarioVersionId.map(_.value.toString).getOrElse("none"))
MDC.put("username", username)
logger.info(log)
MDC.clear()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object DefaultProcessingTypeDeployedScenariosProvider {
val dumbModelInfoProvier = ProcessingTypeDataProvider.withEmptyCombinedData(
Map(processingType -> ValueWithRestriction.anyUser(Map.empty[String, String]))
)
val actionRepository = new DbScenarioActionRepository(dbRef, dumbModelInfoProvier)
val actionRepository = DbScenarioActionRepository.create(dbRef, dumbModelInfoProvier)
val scenarioLabelsRepository = new ScenarioLabelsRepository(dbRef)
val processRepository = DBFetchingProcessRepository.create(dbRef, actionRepository, scenarioLabelsRepository)
val futureProcessRepository =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import cats.implicits.{toFoldableOps, toTraverseOps}
import cats.syntax.functor._
import com.typesafe.scalalogging.LazyLogging
import db.util.DBIOActionInstances._
import pl.touk.nussknacker.engine.api.Comment
import pl.touk.nussknacker.engine.api.component.NodesDeploymentData
import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName.{Cancel, Deploy}
import pl.touk.nussknacker.engine.api.deployment._
Expand All @@ -19,7 +20,6 @@ import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails
import pl.touk.nussknacker.ui.api.{DeploymentCommentSettings, ListenerApiUser}
import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionFailed, OnActionSuccess}
import pl.touk.nussknacker.ui.listener.{ProcessChangeListener, User => ListenerUser}
import pl.touk.nussknacker.engine.api.Comment
import pl.touk.nussknacker.ui.process.ProcessStateProvider
import pl.touk.nussknacker.ui.process.ScenarioWithDetailsConversions._
import pl.touk.nussknacker.ui.process.deployment.LoggedUserConversions.LoggedUserOps
Expand Down Expand Up @@ -48,7 +48,7 @@ import scala.util.{Failure, Success}
class DeploymentService(
dispatcher: DeploymentManagerDispatcher,
processRepository: FetchingProcessRepository[DB],
actionRepository: DbScenarioActionRepository,
actionRepository: ScenarioActionRepository,
dbioRunner: DBIOActionRunner,
processValidator: ProcessingTypeDataProvider[UIProcessValidator, _],
scenarioResolver: ProcessingTypeDataProvider[ScenarioResolver, _],
Expand Down Expand Up @@ -141,7 +141,7 @@ class DeploymentService(
actionResult <- validateBeforeDeploy(ctx.latestScenarioDetails, deployedScenarioData, updateStrategy)
.transformWith {
case Failure(ex) =>
removeInvalidAction(ctx.actionId).transform(_ => Failure(ex))
removeInvalidAction(ctx).transform(_ => Failure(ex))
case Success(_) =>
// we notify of deployment finish/fail only if initial validation succeeded
val deploymentFuture = runActionAndHandleResults(
Expand Down Expand Up @@ -170,10 +170,9 @@ class DeploymentService(
getBuildInfoProcessingType: ScenarioWithDetailsEntity[PS] => Option[ProcessingType]
)(implicit user: LoggedUser): Future[CommandContext[PS]] = {
implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh
dbioRunner.runInTransaction(
// 1.1 lock for critical section
transactionallyRunCriticalSection(
for {
// 1.1 lock for critical section
_ <- actionRepository.lockActionsTable
// 1.2. fetch scenario data
processDetailsOpt <- processRepository.fetchLatestProcessDetailsForProcessId[PS](processId.id)
processDetails <- existsOrFail(processDetailsOpt, ProcessNotFoundError(processId.name))
Expand Down Expand Up @@ -207,6 +206,10 @@ class DeploymentService(
)
}

private def transactionallyRunCriticalSection[T](dbioAction: DB[T]) = {
dbioRunner.runInTransaction(actionRepository.withLockedTable(dbioAction))
}

// TODO: Use buildInfo explicitly instead of ProcessingType-that-is-used-to-calculate-buildInfo
private case class CommandContext[PS: ScenarioShapeFetchStrategy](
latestScenarioDetails: ScenarioWithDetailsEntity[PS],
Expand Down Expand Up @@ -387,14 +390,22 @@ class DeploymentService(
// Before we can do that we should check if we somewhere rely on fact that version is always defined -
// see ProcessAction.processVersionId
logger.info(s"Action $actionString finished for action without version id - skipping listener notification")
removeInvalidAction(ctx.actionId)
removeInvalidAction(ctx)
}
.map(_ => result)
}
}

private def removeInvalidAction(actionId: ProcessActionId): Future[Unit] = {
dbioRunner.runInTransaction(actionRepository.removeAction(actionId))
private def removeInvalidAction[PS: ScenarioShapeFetchStrategy](
context: CommandContext[PS]
)(implicit user: LoggedUser): Future[Unit] = {
dbioRunner.runInTransaction(
actionRepository.removeAction(
context.actionId,
context.latestScenarioDetails.processId,
context.versionOnWhichActionIsDone
)
)
}

// TODO: check deployment id to be sure that returned status is for given deployment
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package pl.touk.nussknacker.ui.process.repository

import db.util.DBIOActionInstances.DB
import slick.lifted.{AbstractTable, TableQuery => LTableQuery}

import scala.concurrent.ExecutionContext

trait LockableTable {

def withLockedTable[T](
dbioAction: DB[T]
): DB[T]

}

trait DbLockableTable { this: DbioRepository =>

import profile.api._

type ENTITY <: AbstractTable[_]

protected implicit def executionContext: ExecutionContext

protected def table: LTableQuery[ENTITY]

def withLockedTable[T](
dbioAction: DB[T]
): DB[T] = for {
_ <- lockTable
result <- dbioAction
} yield result

private def lockTable: DB[Unit] = {
run(table.filter(_ => false).forUpdate.result.map(_ => ()))
}

}
Loading

0 comments on commit a4d84cc

Please sign in to comment.