Skip to content

Commit

Permalink
Merge branch 'master' into schatt-3066-restrict-workflow-copy-action-…
Browse files Browse the repository at this point in the history
…for-restricted-users
  • Loading branch information
paulschatt authored Jan 13, 2025
2 parents 48557b5 + ae0c7bc commit 0395840
Show file tree
Hide file tree
Showing 28 changed files with 1,983 additions and 1,312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import edu.uci.ics.amber.core.virtualidentity.ActorVirtualIdentity
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.dao.SqlServer.withTransaction
import edu.uci.ics.texera.dao.jooq.generated.Tables.{
OPERATOR_EXECUTIONS,
OPERATOR_RUNTIME_STATISTICS,
WORKFLOW_EXECUTIONS,
WORKFLOW_RUNTIME_STATISTICS,
WORKFLOW_VERSION
}
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowRuntimeStatistics
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.WorkflowRuntimeStatistics
import org.jooq.types.UInteger

import scala.jdk.CollectionConverters.ListHasAsScala
Expand Down Expand Up @@ -99,46 +100,48 @@ class DefaultCostEstimator(
val widAsUInteger = UInteger.valueOf(wid)
val rawStats = context
.select(
WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID,
WORKFLOW_RUNTIME_STATISTICS.TIME,
WORKFLOW_RUNTIME_STATISTICS.DATA_PROCESSING_TIME,
WORKFLOW_RUNTIME_STATISTICS.CONTROL_PROCESSING_TIME,
WORKFLOW_RUNTIME_STATISTICS.EXECUTION_ID
OPERATOR_EXECUTIONS.OPERATOR_ID,
OPERATOR_RUNTIME_STATISTICS.INPUT_TUPLE_CNT,
OPERATOR_RUNTIME_STATISTICS.OUTPUT_TUPLE_CNT,
OPERATOR_RUNTIME_STATISTICS.TIME,
OPERATOR_RUNTIME_STATISTICS.DATA_PROCESSING_TIME,
OPERATOR_RUNTIME_STATISTICS.CONTROL_PROCESSING_TIME,
OPERATOR_RUNTIME_STATISTICS.IDLE_TIME,
OPERATOR_RUNTIME_STATISTICS.NUM_WORKERS
)
.from(OPERATOR_EXECUTIONS)
.join(OPERATOR_RUNTIME_STATISTICS)
.on(
OPERATOR_EXECUTIONS.OPERATOR_EXECUTION_ID.eq(
OPERATOR_RUNTIME_STATISTICS.OPERATOR_EXECUTION_ID
)
)
.from(WORKFLOW_RUNTIME_STATISTICS)
.where(
WORKFLOW_RUNTIME_STATISTICS.WORKFLOW_ID
.eq(widAsUInteger)
.and(
WORKFLOW_RUNTIME_STATISTICS.EXECUTION_ID.eq(
context
.select(
WORKFLOW_EXECUTIONS.EID
)
.from(WORKFLOW_EXECUTIONS)
.join(WORKFLOW_VERSION)
.on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID))
.where(
WORKFLOW_VERSION.WID
.eq(widAsUInteger)
.and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte))
)
.orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc())
.limit(1)
OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(
context
.select(WORKFLOW_EXECUTIONS.EID)
.from(WORKFLOW_EXECUTIONS)
.join(WORKFLOW_VERSION)
.on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID))
.where(
WORKFLOW_VERSION.WID
.eq(widAsUInteger)
.and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte))
)
)
.orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc())
.limit(1)
)
)
.orderBy(WORKFLOW_RUNTIME_STATISTICS.TIME, WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID)
.fetchInto(classOf[WorkflowRuntimeStatistics])
.asScala
.toList
if (rawStats.isEmpty) {
None
} else {
val cumulatedStats = rawStats.foldLeft(Map.empty[String, Double]) { (acc, stat) =>
val opTotalExecutionTime = acc.getOrElse(stat.getOperatorId, 0.0)
acc + (stat.getOperatorId -> (opTotalExecutionTime + (stat.getDataProcessingTime
.doubleValue() + stat.getControlProcessingTime.doubleValue()) / 1e9))
val opTotalExecutionTime = acc.getOrElse(stat.operatorId, 0.0)
acc + (stat.operatorId -> (opTotalExecutionTime + (stat.dataProcessingTime
.doubleValue() + stat.controlProcessingTime.doubleValue()) / 1e9))
}
Some(cumulatedStats)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ import edu.uci.ics.texera.web.auth.SessionUser
import edu.uci.ics.texera.dao.jooq.generated.Tables.{
USER,
WORKFLOW_EXECUTIONS,
WORKFLOW_RUNTIME_STATISTICS,
OPERATOR_EXECUTIONS,
OPERATOR_RUNTIME_STATISTICS,
WORKFLOW_VERSION
}
import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{
WorkflowExecutionsDao,
WorkflowRuntimeStatisticsDao
OperatorExecutionsDao,
OperatorRuntimeStatisticsDao,
WorkflowExecutionsDao
}
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{
WorkflowExecutions,
WorkflowRuntimeStatistics
OperatorExecutions,
OperatorRuntimeStatistics
}
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._
import edu.uci.ics.texera.web.service.ExecutionsMetadataPersistService
Expand All @@ -40,7 +43,10 @@ object WorkflowExecutionsResource {
.getInstance(StorageConfig.jdbcUrl, StorageConfig.jdbcUsername, StorageConfig.jdbcPassword)
.createDSLContext()
final private lazy val executionsDao = new WorkflowExecutionsDao(context.configuration)
final private lazy val workflowRuntimeStatisticsDao = new WorkflowRuntimeStatisticsDao(
final private lazy val operatorExecutionsDao = new OperatorExecutionsDao(
context.configuration
)
final private lazy val operatorRuntimeStatisticsDao = new OperatorRuntimeStatisticsDao(
context.configuration
)

Expand Down Expand Up @@ -87,8 +93,19 @@ object WorkflowExecutionsResource {
}
}

def insertWorkflowRuntimeStatistics(list: util.ArrayList[WorkflowRuntimeStatistics]): Unit = {
workflowRuntimeStatisticsDao.insert(list);
def insertOperatorExecutions(
list: util.ArrayList[OperatorExecutions]
): util.HashMap[String, ULong] = {
operatorExecutionsDao.insert(list);
val result = new util.HashMap[String, ULong]()
list.forEach(execution => {
result.put(execution.getOperatorId, execution.getOperatorExecutionId)
})
result
}

def insertOperatorRuntimeStatistics(list: util.ArrayList[OperatorRuntimeStatistics]): Unit = {
operatorRuntimeStatisticsDao.insert(list);
}

case class WorkflowExecutionEntry(
Expand All @@ -105,15 +122,10 @@ object WorkflowExecutionsResource {
logLocation: String
)

case class ExecutionResultEntry(
eId: UInteger,
result: String
)

case class OperatorRuntimeStatistics(
case class WorkflowRuntimeStatistics(
operatorId: String,
inputTupleCount: UInteger,
outputTupleCount: UInteger,
inputTupleCount: ULong,
outputTupleCount: ULong,
timestamp: Timestamp,
dataProcessingTime: ULong,
controlProcessingTime: ULong,
Expand Down Expand Up @@ -222,26 +234,36 @@ class WorkflowExecutionsResource {
def retrieveWorkflowRuntimeStatistics(
@PathParam("wid") wid: UInteger,
@PathParam("eid") eid: UInteger
): List[OperatorRuntimeStatistics] = {
): List[WorkflowRuntimeStatistics] = {
context
.select(
WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID,
WORKFLOW_RUNTIME_STATISTICS.INPUT_TUPLE_CNT,
WORKFLOW_RUNTIME_STATISTICS.OUTPUT_TUPLE_CNT,
WORKFLOW_RUNTIME_STATISTICS.TIME,
WORKFLOW_RUNTIME_STATISTICS.DATA_PROCESSING_TIME,
WORKFLOW_RUNTIME_STATISTICS.CONTROL_PROCESSING_TIME,
WORKFLOW_RUNTIME_STATISTICS.IDLE_TIME,
WORKFLOW_RUNTIME_STATISTICS.NUM_WORKERS
OPERATOR_EXECUTIONS.OPERATOR_ID,
OPERATOR_RUNTIME_STATISTICS.INPUT_TUPLE_CNT,
OPERATOR_RUNTIME_STATISTICS.OUTPUT_TUPLE_CNT,
OPERATOR_RUNTIME_STATISTICS.TIME,
OPERATOR_RUNTIME_STATISTICS.DATA_PROCESSING_TIME,
OPERATOR_RUNTIME_STATISTICS.CONTROL_PROCESSING_TIME,
OPERATOR_RUNTIME_STATISTICS.IDLE_TIME,
OPERATOR_RUNTIME_STATISTICS.NUM_WORKERS
)
.from(OPERATOR_RUNTIME_STATISTICS)
.join(OPERATOR_EXECUTIONS)
.on(
OPERATOR_RUNTIME_STATISTICS.OPERATOR_EXECUTION_ID.eq(
OPERATOR_EXECUTIONS.OPERATOR_EXECUTION_ID
)
)
.from(WORKFLOW_RUNTIME_STATISTICS)
.join(WORKFLOW_EXECUTIONS)
.on(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(WORKFLOW_EXECUTIONS.EID))
.join(WORKFLOW_VERSION)
.on(WORKFLOW_EXECUTIONS.VID.eq(WORKFLOW_VERSION.VID))
.where(
WORKFLOW_RUNTIME_STATISTICS.WORKFLOW_ID
WORKFLOW_VERSION.WID
.eq(wid)
.and(WORKFLOW_RUNTIME_STATISTICS.EXECUTION_ID.eq(eid))
.and(WORKFLOW_EXECUTIONS.EID.eq(eid))
)
.orderBy(WORKFLOW_RUNTIME_STATISTICS.TIME, WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID)
.fetchInto(classOf[OperatorRuntimeStatistics])
.orderBy(OPERATOR_RUNTIME_STATISTICS.TIME, OPERATOR_EXECUTIONS.OPERATOR_ID)
.fetchInto(classOf[WorkflowRuntimeStatistics])
.asScala
.toList
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package edu.uci.ics.texera.web.service

import com.google.protobuf.timestamp.Timestamp
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.workflow.WorkflowContext
import edu.uci.ics.amber.engine.architecture.controller.{
ExecutionStatsUpdate,
FatalError,
Expand All @@ -24,7 +23,7 @@ import edu.uci.ics.amber.error.ErrorUtils.{getOperatorFromActorIdOpt, getStackTr
import edu.uci.ics.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE
import edu.uci.ics.amber.core.workflowruntimestate.WorkflowFatalError
import edu.uci.ics.texera.web.SubscriptionManager
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowRuntimeStatistics
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.OperatorRuntimeStatistics
import edu.uci.ics.texera.web.model.websocket.event.{
ExecutionDurationUpdateEvent,
OperatorAggregatedMetrics,
Expand All @@ -43,7 +42,7 @@ import java.util.concurrent.Executors
class ExecutionStatsService(
client: AmberClient,
stateStore: ExecutionStateStore,
workflowContext: WorkflowContext
operatorIdToExecutionId: Map[String, ULong]
) extends SubscriptionManager
with LazyLogging {
private val metricsPersistThread = Executors.newSingleThreadExecutor()
Expand Down Expand Up @@ -201,31 +200,34 @@ class ExecutionStatsService(
private def storeRuntimeStatistics(
operatorStatistics: scala.collection.immutable.Map[String, OperatorMetrics]
): Unit = {
// Add a try-catch to not produce an error when "workflow_runtime_statistics" table does not exist in MySQL
try {
val list: util.ArrayList[WorkflowRuntimeStatistics] =
new util.ArrayList[WorkflowRuntimeStatistics]()
val runtimeStatsList: util.ArrayList[OperatorRuntimeStatistics] =
new util.ArrayList[OperatorRuntimeStatistics]()

for ((operatorId, stat) <- operatorStatistics) {
val execution = new WorkflowRuntimeStatistics()
execution.setWorkflowId(UInteger.valueOf(workflowContext.workflowId.id))
execution.setExecutionId(UInteger.valueOf(workflowContext.executionId.id))
execution.setOperatorId(operatorId)
execution.setInputTupleCnt(
UInteger.valueOf(stat.operatorStatistics.inputCount.map(_.tupleCount).sum)
// Create and populate the operator runtime statistics entry
val runtimeStats = new OperatorRuntimeStatistics()
runtimeStats.setOperatorExecutionId(operatorIdToExecutionId(operatorId))
runtimeStats.setInputTupleCnt(
ULong.valueOf(stat.operatorStatistics.inputCount.map(_.tupleCount).sum)
)
runtimeStats.setOutputTupleCnt(
ULong.valueOf(stat.operatorStatistics.outputCount.map(_.tupleCount).sum)
)
execution.setOutputTupleCnt(
UInteger.valueOf(stat.operatorStatistics.outputCount.map(_.tupleCount).sum)
runtimeStats.setStatus(maptoStatusCode(stat.operatorState))
runtimeStats.setDataProcessingTime(
ULong.valueOf(stat.operatorStatistics.dataProcessingTime)
)
execution.setStatus(maptoStatusCode(stat.operatorState))
execution.setDataProcessingTime(ULong.valueOf(stat.operatorStatistics.dataProcessingTime))
execution.setControlProcessingTime(
runtimeStats.setControlProcessingTime(
ULong.valueOf(stat.operatorStatistics.controlProcessingTime)
)
execution.setIdleTime(ULong.valueOf(stat.operatorStatistics.idleTime))
execution.setNumWorkers(UInteger.valueOf(stat.operatorStatistics.numWorkers))
list.add(execution)
runtimeStats.setIdleTime(ULong.valueOf(stat.operatorStatistics.idleTime))
runtimeStats.setNumWorkers(UInteger.valueOf(stat.operatorStatistics.numWorkers))
runtimeStatsList.add(runtimeStats)
}
WorkflowExecutionsResource.insertWorkflowRuntimeStatistics(list)

// Insert into operator_runtime_statistics table
WorkflowExecutionsResource.insertOperatorRuntimeStatistics(runtimeStatsList)
} catch {
case err: Throwable => logger.error("error occurred when storing runtime statistics", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregat
import edu.uci.ics.amber.engine.common.Utils
import edu.uci.ics.amber.engine.common.client.AmberClient
import edu.uci.ics.amber.engine.common.executionruntimestate.ExecutionMetadataStore
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.OperatorExecutions
import edu.uci.ics.texera.web.model.websocket.event.{
TexeraWebSocketEvent,
WorkflowErrorEvent,
WorkflowStateEvent
}
import edu.uci.ics.texera.web.model.websocket.request.WorkflowExecuteRequest
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
import edu.uci.ics.texera.web.storage.ExecutionStateStore
import edu.uci.ics.texera.web.storage.ExecutionStateStore.updateWorkflowState
import edu.uci.ics.texera.web.{ComputingUnitMaster, SubscriptionManager, WebsocketInput}
import edu.uci.ics.texera.workflow.{LogicalPlan, WorkflowCompiler}
import org.jooq.types.{UInteger, ULong}

import java.net.URI
import java.util
import scala.collection.mutable

class WorkflowExecutionService(
Expand Down Expand Up @@ -103,7 +107,10 @@ class WorkflowExecutionService(
)
executionReconfigurationService =
new ExecutionReconfigurationService(client, executionStateStore, workflow)
executionStatsService = new ExecutionStatsService(client, executionStateStore, workflowContext)
// Create the operatorId to executionId map
val operatorIdToExecutionId = createOperatorIdToExecutionIdMap(workflow)
executionStatsService =
new ExecutionStatsService(client, executionStateStore, operatorIdToExecutionId)
executionRuntimeService = new ExecutionRuntimeService(
client,
executionStateStore,
Expand Down Expand Up @@ -135,6 +142,27 @@ class WorkflowExecutionService(
)
}

private def createOperatorIdToExecutionIdMap(workflow: Workflow): Map[String, ULong] = {
val executionList: util.ArrayList[OperatorExecutions] = new util.ArrayList[OperatorExecutions]()
val operatorIdToExecutionId = scala.collection.mutable.Map[String, ULong]()

workflow.logicalPlan.operators.foreach { operator =>
val operatorId = operator.operatorIdentifier.id
val execution = new OperatorExecutions()
execution.setWorkflowExecutionId(UInteger.valueOf(workflowContext.executionId.id))
execution.setOperatorId(operatorId)
executionList.add(execution)
}

val insertedExecutionIds = WorkflowExecutionsResource.insertOperatorExecutions(executionList)
insertedExecutionIds.forEach {
case (operatorId, executionId) =>
operatorIdToExecutionId += (operatorId -> executionId)
}

operatorIdToExecutionId.toMap
}

override def unsubscribeAll(): Unit = {
super.unsubscribeAll()
if (client != null) {
Expand Down
Loading

0 comments on commit 0395840

Please sign in to comment.