Skip to content

Commit

Permalink
Add URI generator, resolver and corresponding document open & create …
Browse files Browse the repository at this point in the history
…functions (#3211)

This PR refactor the storage key by representing it using the storage
URI. And this PR also adds the `openDocument` and `createDocument`
function using the given URI.

## Major Changes

### 1. VFS URI resource type definition, resolve and decode functions
Two types of VFS resources are defined:
```
object VFSResourceType extends Enumeration {
  val RESULT: Value = Value("result")
  val MATERIALIZED_RESULT: Value = Value("materializedResult")
}
```

Two defs are added to the `FileResolver`

- `resolve`: create the URI pointing to the storage resource on the VFS
```java
/**
    * Resolve a VFS resource to its URI. The URI can be used by the DocumentFactory to create resource or open resource
    *
    * @param resourceType   The type of the VFS resource.
    * @param workflowId     Workflow identifier.
    * @param executionId    Execution identifier.
    * @param operatorId     Operator identifier.
    * @param portIdentity   Optional port identifier. **Required** if `resourceType` is `RESULT` or `MATERIALIZED_RESULT`.
    * @return A VFS URI
    * @throws IllegalArgumentException if `resourceType` is `RESULT` but `portIdentity` is missing.
    */
  def resolve(
      resourceType: VFSResourceType.Value,
      workflowId: WorkflowIdentity,
      executionId: ExecutionIdentity,
      operatorId: OperatorIdentity,
      portIdentity: Option[PortIdentity] = None
  ): URI
```

- `decodeVFSUri`: decode a VFS URI to components
```java
  /**
    * Parses a VFS URI and extracts its components
    *
    * @param uri The VFS URI to parse.
    * @return A `VFSUriComponents` object with the extracted data.
    * @throws IllegalArgumentException if the URI is malformed.
    */
  def decodeVFSUri(uri: URI): (
      WorkflowIdentity,
      ExecutionIdentity,
      OperatorIdentity,
      Option[PortIdentity],
      VFSResourceType.Value
  )
```


### 2. `createDocument` and `openDocument` functions to the
`DocumentFactory`

`createDocument` and `openDocument` defs to create/open a storage
resource pointed by the URI

- `DocumentFactory.createDocument`
```java
  /**
    * Create a document for storage specified by the uri.
    * This document is suitable for storing structural data, i.e. the schema is required to create such document.
    * @param uri the location of the document
    * @param schema the schema of the data stored in the document
    * @return the created document
    */
  def createDocument(uri: URI, schema: Schema): VirtualDocument[_]
```

- `DocumentFactory.openDocument`
```java
  /**
    * Open a document specified by the uri.
    * The document should be storing the structural data as the document and the schema will be returned
    * @param uri the uri of the document
    * @return the VirtualDocument, which is the handler of the data; the Schema, which is the schema of the data stored in the document
    */
  def openDocument(uri: URI): (VirtualDocument[_], Schema)
```
  • Loading branch information
bobbai00 authored Jan 15, 2025
1 parent 9b9c200 commit d4d4176
Show file tree
Hide file tree
Showing 42 changed files with 647 additions and 424 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package edu.uci.ics.amber.engine.architecture.scheduling

import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage}
import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT
import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex
import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{
Expand Down Expand Up @@ -151,17 +153,18 @@ abstract class ScheduleGenerator(
.removeLink(physicalLink)

// create cache writer and link
val storageKey = OpResultStorage.createStorageKey(
// create the uri of the materialization storage
val storageUri = VFSURIFactory.createMaterializedResultURI(
workflowContext.workflowId,
workflowContext.executionId,
physicalLink.fromOpId.logicalOpId,
physicalLink.fromPortId,
isMaterialized = true
physicalLink.fromPortId
)

val fromPortOutputMode =
physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode
val matWriterPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp(
workflowContext.workflowId,
workflowContext.executionId,
storageKey,
storageUri,
fromPortOutputMode
)
val sourceToWriterLink =
Expand All @@ -182,19 +185,15 @@ abstract class ScheduleGenerator(
._3
.toOption
.get
ResultStorage
.getOpResultStorage(workflowContext.workflowId)
.create(
key = storageKey,
mode = OpResultStorage.defaultStorageMode,
schema = schema
)
// create the document
DocumentFactory.createDocument(storageUri, schema)
ExecutionResourcesMapping.addResourceUri(workflowContext.executionId, storageUri)

// create cache reader and link
val matReaderPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSourcePhysicalOp(
workflowContext.workflowId,
workflowContext.executionId,
storageKey
storageUri
)
val readerToDestLink =
PhysicalLink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import edu.uci.ics.amber.operator.sink.ProgressiveSinkOpExec
import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec
import edu.uci.ics.amber.util.VirtualIdentityUtils

import java.net.URI

trait InitializeExecutorHandler {
this: DataProcessorRPCHandlerInitializer =>

Expand All @@ -26,15 +28,14 @@ trait InitializeExecutorHandler {
case OpExecWithClassName(className, descString) =>
ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, workerCount)
case OpExecWithCode(code, _) => ExecFactory.newExecFromJavaCode(code)
case OpExecSink(storageKey, workflowIdentity, outputMode) =>
case OpExecSink(storageUri, workflowIdentity, outputMode) =>
new ProgressiveSinkOpExec(
workerIdx,
outputMode,
storageKey,
workflowIdentity
URI.create(storageUri)
)
case OpExecSource(storageKey, workflowIdentity) =>
new CacheSourceOpExec(storageKey, workflowIdentity)
case OpExecSource(storageUri, _) =>
new CacheSourceOpExec(URI.create(storageUri))
}
EmptyReturn()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package edu.uci.ics.texera.web

import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.result.OpResultStorage
import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.storage.util.mongo.MongoDatabaseManager
import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig
Expand Down Expand Up @@ -179,9 +179,9 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with
val storageType = collection.get("storageType").asText()
val collectionName = collection.get("storageKey").asText()
storageType match {
case OpResultStorage.ICEBERG =>
case DocumentFactory.ICEBERG =>
// rely on the server-side result cleanup logic.
case OpResultStorage.MONGODB =>
case DocumentFactory.MONGODB =>
MongoDatabaseManager.dropCollection(collectionName)
}
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package edu.uci.ics.texera.web.resource.dashboard.user.dataset

import edu.uci.ics.amber.core.storage.{FileResolver, StorageConfig}
import edu.uci.ics.amber.core.storage.model.DatasetFileDocument
import edu.uci.ics.amber.core.storage.{DocumentFactory, FileResolver, StorageConfig}
import edu.uci.ics.amber.core.storage.util.dataset.{
GitVersionControlLocalFileStorage,
PhysicalFileNode
Expand Down Expand Up @@ -283,7 +282,7 @@ object DatasetResource {
}

datasetOperation.filesToRemove.foreach { fileUri =>
new DatasetFileDocument(fileUri).clear()
DocumentFactory.openDocument(fileUri)._1.clear()
}
}
)
Expand Down Expand Up @@ -820,7 +819,7 @@ class DatasetResource {
val fileUri = FileResolver.resolve(decodedPathStr)
val streamingOutput = new StreamingOutput() {
override def write(output: OutputStream): Unit = {
val inputStream = new DatasetFileDocument(fileUri).asInputStream()
val inputStream = DocumentFactory.openReadonlyDocument(fileUri).asInputStream()
try {
val buffer = new Array[Byte](8192) // buffer size
var bytesRead = inputStream.read(buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import edu.uci.ics.amber.core.storage.StorageConfig
import edu.uci.ics.amber.engine.architecture.logreplay.{ReplayDestination, ReplayLogRecord}
import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage
import edu.uci.ics.amber.core.virtualidentity.{ChannelMarkerIdentity, ExecutionIdentity}
import edu.uci.ics.amber.engine.common.AmberConfig
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.web.auth.SessionUser
import edu.uci.ics.texera.dao.jooq.generated.Tables.{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import akka.actor.Cancellable
import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName}
import com.fasterxml.jackson.databind.node.ObjectNode
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.StorageConfig
import edu.uci.ics.amber.core.storage.result.OpResultStorage.MONGODB
import edu.uci.ics.amber.core.storage.DocumentFactory.MONGODB
import edu.uci.ics.amber.core.storage.VFSResourceType.{MATERIALIZED_RESULT, RESULT}
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory}
import edu.uci.ics.amber.core.storage.result._
import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan}
Expand All @@ -19,7 +21,11 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregat
import edu.uci.ics.amber.engine.common.client.AmberClient
import edu.uci.ics.amber.engine.common.executionruntimestate.ExecutionMetadataStore
import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime}
import edu.uci.ics.amber.core.virtualidentity.{OperatorIdentity, WorkflowIdentity}
import edu.uci.ics.amber.core.virtualidentity.{
ExecutionIdentity,
OperatorIdentity,
WorkflowIdentity
}
import edu.uci.ics.amber.core.workflow.OutputPort.OutputMode
import edu.uci.ics.amber.core.workflow.PortIdentity
import edu.uci.ics.texera.web.SubscriptionManager
Expand All @@ -29,7 +35,10 @@ import edu.uci.ics.texera.web.model.websocket.event.{
WebResultUpdateEvent
}
import edu.uci.ics.texera.web.model.websocket.request.ResultPaginationRequest
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
import edu.uci.ics.texera.web.service.WorkflowExecutionService.getLatestExecutionId
import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore}
import org.jooq.types.UInteger

import java.util.UUID
import scala.collection.mutable
Expand Down Expand Up @@ -60,6 +69,7 @@ object ExecutionResultService {
*/
private def convertWebResultUpdate(
workflowIdentity: WorkflowIdentity,
executionId: ExecutionIdentity,
physicalOps: List[PhysicalOp],
oldTupleCount: Int,
newTupleCount: Int
Expand All @@ -85,10 +95,14 @@ object ExecutionResultService {
}
}

val storage =
ResultStorage
.getOpResultStorage(workflowIdentity)
.get(OpResultStorage.createStorageKey(physicalOps.head.id.logicalOpId, PortIdentity()))
val storageUri = VFSURIFactory.createResultURI(
workflowIdentity,
executionId,
physicalOps.head.id.logicalOpId,
PortIdentity()
)
val storage: VirtualDocument[Tuple] =
DocumentFactory.openDocument(storageUri)._1.asInstanceOf[VirtualDocument[Tuple]]
val webUpdate = webOutputMode match {
case PaginationMode() =>
val numTuples = storage.getCount
Expand Down Expand Up @@ -167,11 +181,11 @@ class ExecutionResultService(
private var resultUpdateCancellable: Cancellable = _

def attachToExecution(
executionId: ExecutionIdentity,
stateStore: ExecutionStateStore,
physicalPlan: PhysicalPlan,
client: AmberClient
): Unit = {

if (resultUpdateCancellable != null && !resultUpdateCancellable.isCancelled) {
resultUpdateCancellable.cancel()
}
Expand All @@ -188,7 +202,7 @@ class ExecutionResultService(
2.seconds,
resultPullingFrequency.seconds
) {
onResultUpdate(physicalPlan)
onResultUpdate(executionId, physicalPlan)
}
}
} else {
Expand All @@ -204,7 +218,7 @@ class ExecutionResultService(
logger.info("Workflow execution terminated. Stop update results.")
if (resultUpdateCancellable.cancel() || resultUpdateCancellable.isCancelled) {
// immediately perform final update
onResultUpdate(physicalPlan)
onResultUpdate(executionId, physicalPlan)
}
}
})
Expand Down Expand Up @@ -233,16 +247,20 @@ class ExecutionResultService(
val oldInfo = oldState.resultInfo.getOrElse(opId, OperatorResultMetadata())
buf(opId.id) = ExecutionResultService.convertWebResultUpdate(
workflowIdentity,
executionId,
physicalPlan.getPhysicalOpsOfLogicalOp(opId),
oldInfo.tupleCount,
info.tupleCount
)
if (StorageConfig.resultStorageMode == MONGODB) {
// using the first port for now. TODO: support multiple ports
val storageKey = OpResultStorage.createStorageKey(opId, PortIdentity())
val opStorage = ResultStorage
.getOpResultStorage(workflowIdentity)
.get(storageKey)
val storageUri = VFSURIFactory.createResultURI(
workflowIdentity,
executionId,
opId,
PortIdentity()
)
val opStorage = DocumentFactory.openDocument(storageUri)._1
opStorage match {
case mongoDocument: MongoDocument[Tuple] =>
val tableCatStats = mongoDocument.getCategoricalStats
Expand Down Expand Up @@ -278,14 +296,21 @@ class ExecutionResultService(
def handleResultPagination(request: ResultPaginationRequest): TexeraWebSocketEvent = {
// calculate from index (pageIndex starts from 1 instead of 0)
val from = request.pageSize * (request.pageIndex - 1)

val latestExecutionId = getLatestExecutionId(workflowIdentity).getOrElse(
throw new IllegalStateException("No execution is recorded")
)
// using the first port for now. TODO: support multiple ports
val storageKey =
OpResultStorage.createStorageKey(OperatorIdentity(request.operatorID), PortIdentity())
val storageUri = VFSURIFactory.createResultURI(
workflowIdentity,
latestExecutionId,
OperatorIdentity(request.operatorID),
PortIdentity()
)
val paginationIterable = {
ResultStorage
.getOpResultStorage(workflowIdentity)
.get(storageKey)
DocumentFactory
.openDocument(storageUri)
._1
.asInstanceOf[VirtualDocument[Tuple]]
.getRange(from, from + request.pageSize)
.to(Iterable)
}
Expand All @@ -298,26 +323,24 @@ class ExecutionResultService(
PaginatedResultEvent.apply(request, mappedResults, attributes)
}

private def onResultUpdate(physicalPlan: PhysicalPlan): Unit = {
private def onResultUpdate(executionId: ExecutionIdentity, physicalPlan: PhysicalPlan): Unit = {
workflowStateStore.resultStore.updateState { _ =>
val newInfo: Map[OperatorIdentity, OperatorResultMetadata] = {
ResultStorage
.getOpResultStorage(workflowIdentity)
.getAllKeys
.filter(!_.startsWith("materialized_"))
.map(storageKey => {
val count = ResultStorage
.getOpResultStorage(workflowIdentity)
.get(storageKey)
.getCount
.toInt

val (opId, storagePortId) = OpResultStorage.decodeStorageKey(storageKey)
ExecutionResourcesMapping
.getResourceURIs(executionId)
.filter(uri => {
val (_, _, _, _, resourceType) = VFSURIFactory.decodeURI(uri)
resourceType != MATERIALIZED_RESULT
})
.map(uri => {
val count = DocumentFactory.openDocument(uri)._1.getCount.toInt

val (_, _, opId, storagePortId, _) = VFSURIFactory.decodeURI(uri)

// Retrieve the mode of the specified output port
val mode = physicalPlan
.getPhysicalOpsOfLogicalOp(opId)
.flatMap(_.outputPorts.get(storagePortId))
.flatMap(_.outputPorts.get(storagePortId.get))
.map(_._1.mode)
.head

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import com.google.api.services.drive.Drive
import com.google.api.services.drive.model.{File, FileList, Permission}
import com.google.api.services.sheets.v4.Sheets
import com.google.api.services.sheets.v4.model.{Spreadsheet, SpreadsheetProperties, ValueRange}
import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage}
import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.engine.common.Utils.retry
import edu.uci.ics.amber.util.PathUtils
Expand All @@ -22,6 +23,7 @@ import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowVersionRe
import org.jooq.types.UInteger
import edu.uci.ics.amber.util.ArrowUtils
import edu.uci.ics.amber.core.workflow.PortIdentity
import edu.uci.ics.texera.web.service.WorkflowExecutionService.getLatestExecutionId

import java.io.{PipedInputStream, PipedOutputStream}
import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -56,7 +58,6 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
import ResultExportService._

private val cache = new mutable.HashMap[String, String]

def exportResult(
user: User,
request: ResultExportRequest
Expand All @@ -71,11 +72,17 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {

// By now the workflow should finish running
// Only supports external port 0 for now. TODO: support multiple ports
val storageUri = VFSURIFactory.createResultURI(
workflowIdentity,
getLatestExecutionId(workflowIdentity).getOrElse(
return ResultExportResponse("error", "The workflow contains no results")
),
OperatorIdentity(request.operatorId),
PortIdentity()
)
val operatorResult: VirtualDocument[Tuple] =
ResultStorage
.getOpResultStorage(workflowIdentity)
.get(OpResultStorage.createStorageKey(OperatorIdentity(request.operatorId), PortIdentity()))
if (operatorResult == null) {
DocumentFactory.openDocument(storageUri)._1.asInstanceOf[VirtualDocument[Tuple]]
if (operatorResult.getCount == 0) {
return ResultExportResponse("error", "The workflow contains no results")
}

Expand Down
Loading

0 comments on commit d4d4176

Please sign in to comment.