Skip to content

Commit

Permalink
Merge branch 'master' into Zhe-categorical-col-stats
Browse files Browse the repository at this point in the history
  • Loading branch information
MiuMiuMiue authored Jan 16, 2025
2 parents e23d15d + 64007f7 commit 9949641
Show file tree
Hide file tree
Showing 88 changed files with 2,989 additions and 2,089 deletions.
9 changes: 0 additions & 9 deletions core/amber/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,6 @@ schedule-generator {
search-timeout-milliseconds = 1000
}

python-language-server{
provider = "pyright" # valid options: ["pyright", "pylsp"]
port = 3000
# Maximum number of retries for starting the language server.
retry-counts = 3
# Time in milliseconds to wait between retry attempts when starting the language server
wait-time-ms = 200
}

ai-assistant-server{
assistant = "none"
# Put your Ai Service authentication key here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import edu.uci.ics.amber.core.virtualidentity.ActorVirtualIdentity
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.dao.SqlServer.withTransaction
import edu.uci.ics.texera.dao.jooq.generated.Tables.{
OPERATOR_EXECUTIONS,
OPERATOR_RUNTIME_STATISTICS,
WORKFLOW_EXECUTIONS,
WORKFLOW_RUNTIME_STATISTICS,
WORKFLOW_VERSION
}
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowRuntimeStatistics
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.WorkflowRuntimeStatistics
import org.jooq.types.UInteger

import scala.jdk.CollectionConverters.ListHasAsScala
Expand Down Expand Up @@ -99,46 +100,48 @@ class DefaultCostEstimator(
val widAsUInteger = UInteger.valueOf(wid)
val rawStats = context
.select(
WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID,
WORKFLOW_RUNTIME_STATISTICS.TIME,
WORKFLOW_RUNTIME_STATISTICS.DATA_PROCESSING_TIME,
WORKFLOW_RUNTIME_STATISTICS.CONTROL_PROCESSING_TIME,
WORKFLOW_RUNTIME_STATISTICS.EXECUTION_ID
OPERATOR_EXECUTIONS.OPERATOR_ID,
OPERATOR_RUNTIME_STATISTICS.INPUT_TUPLE_CNT,
OPERATOR_RUNTIME_STATISTICS.OUTPUT_TUPLE_CNT,
OPERATOR_RUNTIME_STATISTICS.TIME,
OPERATOR_RUNTIME_STATISTICS.DATA_PROCESSING_TIME,
OPERATOR_RUNTIME_STATISTICS.CONTROL_PROCESSING_TIME,
OPERATOR_RUNTIME_STATISTICS.IDLE_TIME,
OPERATOR_RUNTIME_STATISTICS.NUM_WORKERS
)
.from(OPERATOR_EXECUTIONS)
.join(OPERATOR_RUNTIME_STATISTICS)
.on(
OPERATOR_EXECUTIONS.OPERATOR_EXECUTION_ID.eq(
OPERATOR_RUNTIME_STATISTICS.OPERATOR_EXECUTION_ID
)
)
.from(WORKFLOW_RUNTIME_STATISTICS)
.where(
WORKFLOW_RUNTIME_STATISTICS.WORKFLOW_ID
.eq(widAsUInteger)
.and(
WORKFLOW_RUNTIME_STATISTICS.EXECUTION_ID.eq(
context
.select(
WORKFLOW_EXECUTIONS.EID
)
.from(WORKFLOW_EXECUTIONS)
.join(WORKFLOW_VERSION)
.on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID))
.where(
WORKFLOW_VERSION.WID
.eq(widAsUInteger)
.and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte))
)
.orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc())
.limit(1)
OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(
context
.select(WORKFLOW_EXECUTIONS.EID)
.from(WORKFLOW_EXECUTIONS)
.join(WORKFLOW_VERSION)
.on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID))
.where(
WORKFLOW_VERSION.WID
.eq(widAsUInteger)
.and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte))
)
)
.orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc())
.limit(1)
)
)
.orderBy(WORKFLOW_RUNTIME_STATISTICS.TIME, WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID)
.fetchInto(classOf[WorkflowRuntimeStatistics])
.asScala
.toList
if (rawStats.isEmpty) {
None
} else {
val cumulatedStats = rawStats.foldLeft(Map.empty[String, Double]) { (acc, stat) =>
val opTotalExecutionTime = acc.getOrElse(stat.getOperatorId, 0.0)
acc + (stat.getOperatorId -> (opTotalExecutionTime + (stat.getDataProcessingTime
.doubleValue() + stat.getControlProcessingTime.doubleValue()) / 1e9))
val opTotalExecutionTime = acc.getOrElse(stat.operatorId, 0.0)
acc + (stat.operatorId -> (opTotalExecutionTime + (stat.dataProcessingTime
.doubleValue() + stat.controlProcessingTime.doubleValue()) / 1e9))
}
Some(cumulatedStats)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edu.uci.ics.amber.engine.architecture.scheduling

import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage}
import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex
import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{
Expand Down Expand Up @@ -151,17 +152,18 @@ abstract class ScheduleGenerator(
.removeLink(physicalLink)

// create cache writer and link
val storageKey = OpResultStorage.createStorageKey(
// create the uri of the materialization storage
val storageUri = VFSURIFactory.createMaterializedResultURI(
workflowContext.workflowId,
workflowContext.executionId,
physicalLink.fromOpId.logicalOpId,
physicalLink.fromPortId,
isMaterialized = true
physicalLink.fromPortId
)

val fromPortOutputMode =
physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode
val matWriterPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp(
workflowContext.workflowId,
workflowContext.executionId,
storageKey,
storageUri,
fromPortOutputMode
)
val sourceToWriterLink =
Expand All @@ -182,19 +184,15 @@ abstract class ScheduleGenerator(
._3
.toOption
.get
ResultStorage
.getOpResultStorage(workflowContext.workflowId)
.create(
key = storageKey,
mode = OpResultStorage.defaultStorageMode,
schema = schema
)
// create the document
DocumentFactory.createDocument(storageUri, schema)
ExecutionResourcesMapping.addResourceUri(workflowContext.executionId, storageUri)

// create cache reader and link
val matReaderPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSourcePhysicalOp(
workflowContext.workflowId,
workflowContext.executionId,
storageKey
storageUri
)
val readerToDestLink =
PhysicalLink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import edu.uci.ics.amber.operator.sink.ProgressiveSinkOpExec
import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec
import edu.uci.ics.amber.util.VirtualIdentityUtils

import java.net.URI

trait InitializeExecutorHandler {
this: DataProcessorRPCHandlerInitializer =>

Expand All @@ -26,15 +28,14 @@ trait InitializeExecutorHandler {
case OpExecWithClassName(className, descString) =>
ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, workerCount)
case OpExecWithCode(code, _) => ExecFactory.newExecFromJavaCode(code)
case OpExecSink(storageKey, workflowIdentity, outputMode) =>
case OpExecSink(storageUri, workflowIdentity, outputMode) =>
new ProgressiveSinkOpExec(
workerIdx,
outputMode,
storageKey,
workflowIdentity
URI.create(storageUri)
)
case OpExecSource(storageKey, workflowIdentity) =>
new CacheSourceOpExec(storageKey, workflowIdentity)
case OpExecSource(storageUri, _) =>
new CacheSourceOpExec(URI.create(storageUri))
}
EmptyReturn()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ object AmberConfig {
val cleanupAllExecutionResults: Boolean =
getConfSource.getBoolean("web-server.clean-all-execution-results-on-server-start")

// Language server configuration
val pythonLanguageServerConfig: Config = getConfSource.getConfig("python-language-server")
// Python language server configuration
var aiAssistantConfig: Option[Config] = None
if (getConfSource.hasPath("ai-assistant-server")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package edu.uci.ics.texera.web

import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.result.OpResultStorage
import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.storage.util.mongo.MongoDatabaseManager
import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig
Expand Down Expand Up @@ -179,9 +179,9 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with
val storageType = collection.get("storageType").asText()
val collectionName = collection.get("storageKey").asText()
storageType match {
case OpResultStorage.ICEBERG =>
case DocumentFactory.ICEBERG =>
// rely on the server-side result cleanup logic.
case OpResultStorage.MONGODB =>
case DocumentFactory.MONGODB =>
MongoDatabaseManager.dropCollection(collectionName)
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ object DatasetSearchQueryBuilder extends SearchQueryBuilder {
),
dataset.getOwnerUid == uid,
List(),
DatasetResource.calculateLatestDatasetVersionSize(dataset.getDid)
DatasetResource.calculateDatasetVersionSize(dataset.getDid)
)
DashboardClickableFileEntry(
resourceType = SearchQueryBuilder.DATASET_RESOURCE_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.web.model.common.AccessEntry
import edu.uci.ics.texera.dao.jooq.generated.Tables.USER
import edu.uci.ics.texera.dao.jooq.generated.enums.DatasetUserAccessPrivilege
import edu.uci.ics.texera.dao.jooq.generated.tables.Dataset.DATASET
import edu.uci.ics.texera.dao.jooq.generated.tables.DatasetUserAccess.DATASET_USER_ACCESS
import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{DatasetDao, DatasetUserAccessDao, UserDao}
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{DatasetUserAccess, User}
Expand All @@ -28,36 +27,28 @@ object DatasetAccessResource {
.createDSLContext()

def userHasReadAccess(ctx: DSLContext, did: UInteger, uid: UInteger): Boolean = {
val datasetDao = new DatasetDao(ctx.configuration())
val isDatasetPublic = Option(datasetDao.fetchOneByDid(did))
.flatMap(dataset => Option(dataset.getIsPublic))
.contains(1.toByte)

isDatasetPublic ||
userHasWriteAccess(ctx, did, uid) ||
datasetIsPublic(ctx, did) ||
getDatasetUserAccessPrivilege(ctx, did, uid) == DatasetUserAccessPrivilege.READ
}

def userOwnDataset(ctx: DSLContext, did: UInteger, uid: UInteger): Boolean = {
ctx
.selectOne()
.from(DATASET)
.where(DATASET.DID.eq(did))
.and(DATASET.OWNER_UID.eq(uid))
.fetch()
.isNotEmpty
val datasetDao = new DatasetDao(ctx.configuration())

Option(datasetDao.fetchOneByDid(did))
.exists(_.getOwnerUid == uid)
}

def userHasWriteAccess(ctx: DSLContext, did: UInteger, uid: UInteger): Boolean = {
userOwnDataset(ctx, did, uid) ||
getDatasetUserAccessPrivilege(ctx, did, uid) == DatasetUserAccessPrivilege.WRITE
}

def datasetIsPublic(ctx: DSLContext, did: UInteger): Boolean = {
Option(
ctx
.select(DATASET.IS_PUBLIC)
.from(DATASET)
.where(DATASET.DID.eq(did))
.fetchOneInto(classOf[Boolean])
).getOrElse(false)
}

def getDatasetUserAccessPrivilege(
ctx: DSLContext,
did: UInteger,
Expand All @@ -67,21 +58,23 @@ object DatasetAccessResource {
ctx
.select(DATASET_USER_ACCESS.PRIVILEGE)
.from(DATASET_USER_ACCESS)
.where(DATASET_USER_ACCESS.DID.eq(did))
.and(DATASET_USER_ACCESS.UID.eq(uid))
.fetchOne()
)
.map(_.getValue(DATASET_USER_ACCESS.PRIVILEGE))
.getOrElse(DatasetUserAccessPrivilege.NONE)
.where(
DATASET_USER_ACCESS.DID
.eq(did)
.and(DATASET_USER_ACCESS.UID.eq(uid))
)
.fetchOneInto(classOf[DatasetUserAccessPrivilege])
).getOrElse(DatasetUserAccessPrivilege.NONE)
}

def getOwner(ctx: DSLContext, did: UInteger): User = {
val ownerUid = ctx
.select(DATASET.OWNER_UID)
.from(DATASET)
.where(DATASET.DID.eq(did))
.fetchOneInto(classOf[UInteger])
new UserDao(ctx.configuration()).fetchOneByUid(ownerUid)
val datasetDao = new DatasetDao(ctx.configuration())
val userDao = new UserDao(ctx.configuration())

Option(datasetDao.fetchOneByDid(did))
.flatMap(dataset => Option(dataset.getOwnerUid))
.map(ownerUid => userDao.fetchOneByUid(ownerUid))
.orNull
}
}

Expand Down
Loading

0 comments on commit 9949641

Please sign in to comment.