Skip to content

Commit

Permalink
Merge branch 'master' into xinyuan-basic-for-loop
Browse files Browse the repository at this point in the history
  • Loading branch information
aglinxinyuan authored Jan 16, 2025
2 parents f35ab8c + c5c0701 commit ea5e8c3
Show file tree
Hide file tree
Showing 52 changed files with 886 additions and 583 deletions.
9 changes: 0 additions & 9 deletions core/amber/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,6 @@ schedule-generator {
search-timeout-milliseconds = 1000
}

python-language-server{
provider = "pyright" # valid options: ["pyright", "pylsp"]
port = 3000
# Maximum number of retries for starting the language server.
retry-counts = 3
# Time in milliseconds to wait between retry attempts when starting the language server
wait-time-ms = 200
}

ai-assistant-server{
assistant = "none"
# Put your Ai Service authentication key here
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edu.uci.ics.amber.engine.architecture.scheduling

import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage}
import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex
import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{
Expand Down Expand Up @@ -151,17 +152,18 @@ abstract class ScheduleGenerator(
.removeLink(physicalLink)

// create cache writer and link
val storageKey = OpResultStorage.createStorageKey(
// create the uri of the materialization storage
val storageUri = VFSURIFactory.createMaterializedResultURI(
workflowContext.workflowId,
workflowContext.executionId,
physicalLink.fromOpId.logicalOpId,
physicalLink.fromPortId,
isMaterialized = true
physicalLink.fromPortId
)

val fromPortOutputMode =
physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode
val matWriterPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp(
workflowContext.workflowId,
workflowContext.executionId,
storageKey,
storageUri,
fromPortOutputMode
)
val sourceToWriterLink =
Expand All @@ -182,19 +184,15 @@ abstract class ScheduleGenerator(
._3
.toOption
.get
ResultStorage
.getOpResultStorage(workflowContext.workflowId)
.create(
key = storageKey,
mode = OpResultStorage.defaultStorageMode,
schema = schema
)
// create the document
DocumentFactory.createDocument(storageUri, schema)
ExecutionResourcesMapping.addResourceUri(workflowContext.executionId, storageUri)

// create cache reader and link
val matReaderPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSourcePhysicalOp(
workflowContext.workflowId,
workflowContext.executionId,
storageKey
storageUri
)
val readerToDestLink =
PhysicalLink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import edu.uci.ics.amber.operator.sink.ProgressiveSinkOpExec
import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec
import edu.uci.ics.amber.util.VirtualIdentityUtils

import java.net.URI

trait InitializeExecutorHandler {
this: DataProcessorRPCHandlerInitializer =>

Expand All @@ -26,15 +28,14 @@ trait InitializeExecutorHandler {
case OpExecWithClassName(className, descString) =>
ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, workerCount)
case OpExecWithCode(code, _) => ExecFactory.newExecFromJavaCode(code)
case OpExecSink(storageKey, workflowIdentity, outputMode) =>
case OpExecSink(storageUri, workflowIdentity, outputMode) =>
new ProgressiveSinkOpExec(
workerIdx,
outputMode,
storageKey,
workflowIdentity
URI.create(storageUri)
)
case OpExecSource(storageKey, workflowIdentity) =>
new CacheSourceOpExec(storageKey, workflowIdentity)
case OpExecSource(storageUri, _) =>
new CacheSourceOpExec(URI.create(storageUri))
}
EmptyReturn()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ object AmberConfig {
val cleanupAllExecutionResults: Boolean =
getConfSource.getBoolean("web-server.clean-all-execution-results-on-server-start")

// Language server configuration
val pythonLanguageServerConfig: Config = getConfSource.getConfig("python-language-server")
// Python language server configuration
var aiAssistantConfig: Option[Config] = None
if (getConfSource.hasPath("ai-assistant-server")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package edu.uci.ics.texera.web

import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.result.OpResultStorage
import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.storage.util.mongo.MongoDatabaseManager
import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig
Expand Down Expand Up @@ -179,9 +179,9 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with
val storageType = collection.get("storageType").asText()
val collectionName = collection.get("storageKey").asText()
storageType match {
case OpResultStorage.ICEBERG =>
case DocumentFactory.ICEBERG =>
// rely on the server-side result cleanup logic.
case OpResultStorage.MONGODB =>
case DocumentFactory.MONGODB =>
MongoDatabaseManager.dropCollection(collectionName)
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import edu.uci.ics.texera.web.auth.SessionUser
import edu.uci.ics.texera.dao.jooq.generated.Tables._
import edu.uci.ics.texera.web.resource.dashboard.admin.execution.AdminExecutionResource._
import io.dropwizard.auth.Auth
import org.jooq.impl.DSL
import org.jooq.types.UInteger

import javax.annotation.security.RolesAllowed
import javax.ws.rs._
import javax.ws.rs.core.MediaType
import scala.jdk.CollectionConverters.IterableHasAsScala
import scala.jdk.CollectionConverters._

/**
* This file handles various request related to saved-executions.
Expand Down Expand Up @@ -48,49 +49,130 @@ object AdminExecutionResource {
}
}

def mapToStatus(status: String): Int = {
status match {
case "READY" => 0
case "RUNNING" => 1
case "PAUSED" => 2
case "COMPLETED" => 3
case "FAILED" => 4
case "KILLED" => 5
case _ => -1 // or throw an exception, depends on your needs
}
}

val sortFieldMapping = Map(
"workflow_name" -> WORKFLOW.NAME,
"execution_name" -> WORKFLOW_EXECUTIONS.NAME,
"initiator" -> USER.NAME,
"end_time" -> WORKFLOW_EXECUTIONS.LAST_UPDATE_TIME
)

}

@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/admin/execution")
@RolesAllowed(Array("ADMIN"))
class AdminExecutionResource {

@GET
@Path("/totalWorkflow")
@Produces()
def getTotalWorkflows: Int = {
context
.select(
DSL.countDistinct(WORKFLOW.WID)
)
.from(WORKFLOW_EXECUTIONS)
.join(WORKFLOW_VERSION)
.on(WORKFLOW_EXECUTIONS.VID.eq(WORKFLOW_VERSION.VID))
.join(USER)
.on(WORKFLOW_EXECUTIONS.UID.eq(USER.UID))
.join(WORKFLOW)
.on(WORKFLOW.WID.eq(WORKFLOW_VERSION.WID))
.fetchOne(0, classOf[Int])
}

/**
* This method retrieves all existing executions
* This method retrieves latest execution of each workflow for specified page.
* The returned executions are sorted and filtered according to the parameters.
*/
@GET
@Path("/executionList")
@Path("/executionList/{pageSize}/{pageIndex}/{sortField}/{sortDirection}")
@Produces(Array(MediaType.APPLICATION_JSON))
def listWorkflows(@Auth current_user: SessionUser): List[dashboardExecution] = {
val workflowEntries = context
def listWorkflows(
@Auth current_user: SessionUser,
@PathParam("pageSize") page_size: Int = 20,
@PathParam("pageIndex") page_index: Int = 0,
@PathParam("sortField") sortField: String = "end_time",
@PathParam("sortDirection") sortDirection: String = "desc",
@QueryParam("filter") filter: java.util.List[String]
): List[dashboardExecution] = {
val filter_status = filter.asScala.map(mapToStatus).toSeq.filter(_ != -1).asJava

// Base query that retrieves latest execution info for each workflow without sorting and filtering.
// Only retrieving executions in current page according to pageSize and pageIndex parameters.
val executions_base_query = context
.select(
WORKFLOW_EXECUTIONS.UID,
USER.NAME,
WORKFLOW_VERSION.WID,
WORKFLOW.NAME,
WORKFLOW_EXECUTIONS.EID,
WORKFLOW_EXECUTIONS.VID,
WORKFLOW_EXECUTIONS.STARTING_TIME,
WORKFLOW_EXECUTIONS.LAST_UPDATE_TIME,
WORKFLOW_EXECUTIONS.STATUS,
WORKFLOW_EXECUTIONS.NAME
)
.from(WORKFLOW_EXECUTIONS)
.leftJoin(WORKFLOW_VERSION)
.join(WORKFLOW_VERSION)
.on(WORKFLOW_EXECUTIONS.VID.eq(WORKFLOW_VERSION.VID))
.leftJoin(USER)
.join(USER)
.on(WORKFLOW_EXECUTIONS.UID.eq(USER.UID))
.leftJoin(WORKFLOW)
.join(WORKFLOW)
.on(WORKFLOW.WID.eq(WORKFLOW_VERSION.WID))
.fetch()
.naturalJoin(
context
.select(
DSL.max(WORKFLOW_EXECUTIONS.EID).as("eid")
)
.from(WORKFLOW_EXECUTIONS)
.join(WORKFLOW_VERSION)
.on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID))
.groupBy(WORKFLOW_VERSION.WID)
)

// Apply filter if the status are not empty.
val executions_apply_filter = if (!filter_status.isEmpty) {
executions_base_query.where(WORKFLOW_EXECUTIONS.STATUS.in(filter_status))
} else {
executions_base_query
}

// Apply sorting if user specified.
var executions_apply_order =
executions_apply_filter.limit(page_size).offset(page_index * page_size)
if (sortField != "NO_SORTING") {
executions_apply_order = executions_apply_filter
.orderBy(
if (sortDirection == "desc") sortFieldMapping.getOrElse(sortField, WORKFLOW.NAME).desc()
else sortFieldMapping.getOrElse(sortField, WORKFLOW.NAME).asc()
)
.limit(page_size)
.offset(page_index * page_size)
}

val executions = executions_apply_order.fetch()

// Retrieve the id of each workflow that the user has access to.
val availableWorkflowIds = context
.select(WORKFLOW_USER_ACCESS.WID)
.from(WORKFLOW_USER_ACCESS)
.where(WORKFLOW_USER_ACCESS.UID.eq(current_user.getUid))
.fetchInto(classOf[UInteger])

workflowEntries
// Calculate the statistics needed for each execution.
executions
.map(workflowRecord => {
val startingTime =
workflowRecord.get(WORKFLOW_EXECUTIONS.STARTING_TIME).getTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@ import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.web.model.common.AccessEntry
import edu.uci.ics.texera.dao.jooq.generated.Tables.USER
import edu.uci.ics.texera.dao.jooq.generated.enums.DatasetUserAccessPrivilege
import edu.uci.ics.texera.dao.jooq.generated.tables.Dataset.DATASET
import edu.uci.ics.texera.dao.jooq.generated.tables.DatasetUserAccess.DATASET_USER_ACCESS
import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{DatasetDao, DatasetUserAccessDao, UserDao}
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{DatasetUserAccess, User}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource.{
context,
getOwner
}
import org.jooq.{Condition, DSLContext}
import org.jooq.impl.DSL
import org.jooq.DSLContext
import org.jooq.types.UInteger

import java.util
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package edu.uci.ics.texera.web.resource.dashboard.user.dataset

import edu.uci.ics.amber.core.storage.{FileResolver, StorageConfig}
import edu.uci.ics.amber.core.storage.model.DatasetFileDocument
import edu.uci.ics.amber.core.storage.{DocumentFactory, FileResolver, StorageConfig}
import edu.uci.ics.amber.core.storage.util.dataset.{
GitVersionControlLocalFileStorage,
PhysicalFileNode
Expand All @@ -26,7 +25,7 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{
DatasetVersion,
User
}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource.{context, _}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource._
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource.{context, _}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.`type`.DatasetFileNode
import io.dropwizard.auth.Auth
Expand Down Expand Up @@ -283,7 +282,7 @@ object DatasetResource {
}

datasetOperation.filesToRemove.foreach { fileUri =>
new DatasetFileDocument(fileUri).clear()
DocumentFactory.openDocument(fileUri)._1.clear()
}
}
)
Expand Down Expand Up @@ -820,7 +819,7 @@ class DatasetResource {
val fileUri = FileResolver.resolve(decodedPathStr)
val streamingOutput = new StreamingOutput() {
override def write(output: OutputStream): Unit = {
val inputStream = new DatasetFileDocument(fileUri).asInputStream()
val inputStream = DocumentFactory.openReadonlyDocument(fileUri).asInputStream()
try {
val buffer = new Array[Byte](8192) // buffer size
var bytesRead = inputStream.read(buffer)
Expand Down
Loading

0 comments on commit ea5e8c3

Please sign in to comment.