diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index cc4a84e1a24..00428d66fc3 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -1,6 +1,8 @@ package edu.uci.ics.amber.engine.architecture.scheduling -import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} +import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} +import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT +import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{ @@ -151,17 +153,18 @@ abstract class ScheduleGenerator( .removeLink(physicalLink) // create cache writer and link - val storageKey = OpResultStorage.createStorageKey( + // create the uri of the materialization storage + val storageUri = VFSURIFactory.createMaterializedResultURI( + workflowContext.workflowId, + workflowContext.executionId, physicalLink.fromOpId.logicalOpId, - physicalLink.fromPortId, - isMaterialized = true + physicalLink.fromPortId ) + val fromPortOutputMode = physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode val matWriterPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp( - workflowContext.workflowId, - workflowContext.executionId, - storageKey, + storageUri, fromPortOutputMode ) val sourceToWriterLink = @@ -182,19 +185,15 @@ abstract class ScheduleGenerator( ._3 .toOption .get - ResultStorage - .getOpResultStorage(workflowContext.workflowId) - .create( - key = storageKey, - mode = OpResultStorage.defaultStorageMode, - schema = schema - ) + // create the document + DocumentFactory.createDocument(storageUri, schema) + ExecutionResourcesMapping.addResourceUri(workflowContext.executionId, storageUri) // create cache reader and link val matReaderPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSourcePhysicalOp( workflowContext.workflowId, workflowContext.executionId, - storageKey + storageUri ) val readerToDestLink = PhysicalLink( diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index d5f4bc1064b..18cc3e9167d 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -12,6 +12,8 @@ import edu.uci.ics.amber.operator.sink.ProgressiveSinkOpExec import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec import edu.uci.ics.amber.util.VirtualIdentityUtils +import java.net.URI + trait InitializeExecutorHandler { this: DataProcessorRPCHandlerInitializer => @@ -26,15 +28,14 @@ trait InitializeExecutorHandler { case OpExecWithClassName(className, descString) => ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, workerCount) case OpExecWithCode(code, _) => ExecFactory.newExecFromJavaCode(code) - case OpExecSink(storageKey, workflowIdentity, outputMode) => + case OpExecSink(storageUri, workflowIdentity, outputMode) => new ProgressiveSinkOpExec( workerIdx, outputMode, - storageKey, - workflowIdentity + URI.create(storageUri) ) - case OpExecSource(storageKey, workflowIdentity) => - new CacheSourceOpExec(storageKey, workflowIdentity) + case OpExecSource(storageUri, _) => + new CacheSourceOpExec(URI.create(storageUri)) } EmptyReturn() } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala index 92911a1fc56..a46bc20bb84 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala @@ -2,7 +2,7 @@ package edu.uci.ics.texera.web import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.typesafe.scalalogging.LazyLogging -import edu.uci.ics.amber.core.storage.result.OpResultStorage +import edu.uci.ics.amber.core.storage.DocumentFactory import edu.uci.ics.amber.core.storage.util.mongo.MongoDatabaseManager import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig @@ -179,9 +179,9 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with val storageType = collection.get("storageType").asText() val collectionName = collection.get("storageKey").asText() storageType match { - case OpResultStorage.ICEBERG => + case DocumentFactory.ICEBERG => // rely on the server-side result cleanup logic. - case OpResultStorage.MONGODB => + case DocumentFactory.MONGODB => MongoDatabaseManager.dropCollection(collectionName) } }) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala index 9a319c308ba..fd3bfaf796d 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala @@ -1,7 +1,6 @@ package edu.uci.ics.texera.web.resource.dashboard.user.dataset -import edu.uci.ics.amber.core.storage.{FileResolver, StorageConfig} -import edu.uci.ics.amber.core.storage.model.DatasetFileDocument +import edu.uci.ics.amber.core.storage.{DocumentFactory, FileResolver, StorageConfig} import edu.uci.ics.amber.core.storage.util.dataset.{ GitVersionControlLocalFileStorage, PhysicalFileNode @@ -283,7 +282,7 @@ object DatasetResource { } datasetOperation.filesToRemove.foreach { fileUri => - new DatasetFileDocument(fileUri).clear() + DocumentFactory.openDocument(fileUri)._1.clear() } } ) @@ -820,7 +819,7 @@ class DatasetResource { val fileUri = FileResolver.resolve(decodedPathStr) val streamingOutput = new StreamingOutput() { override def write(output: OutputStream): Unit = { - val inputStream = new DatasetFileDocument(fileUri).asInputStream() + val inputStream = DocumentFactory.openReadonlyDocument(fileUri).asInputStream() try { val buffer = new Array[Byte](8192) // buffer size var bytesRead = inputStream.read(buffer) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index ab6cec64c59..317930056ca 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -4,6 +4,7 @@ import edu.uci.ics.amber.core.storage.StorageConfig import edu.uci.ics.amber.engine.architecture.logreplay.{ReplayDestination, ReplayLogRecord} import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage import edu.uci.ics.amber.core.virtualidentity.{ChannelMarkerIdentity, ExecutionIdentity} +import edu.uci.ics.amber.engine.common.AmberConfig import edu.uci.ics.texera.dao.SqlServer import edu.uci.ics.texera.web.auth.SessionUser import edu.uci.ics.texera.dao.jooq.generated.Tables.{ diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala index 11c0002a311..09163a3377f 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala @@ -4,8 +4,10 @@ import akka.actor.Cancellable import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName} import com.fasterxml.jackson.databind.node.ObjectNode import com.typesafe.scalalogging.LazyLogging -import edu.uci.ics.amber.core.storage.StorageConfig -import edu.uci.ics.amber.core.storage.result.OpResultStorage.MONGODB +import edu.uci.ics.amber.core.storage.DocumentFactory.MONGODB +import edu.uci.ics.amber.core.storage.VFSResourceType.{MATERIALIZED_RESULT, RESULT} +import edu.uci.ics.amber.core.storage.model.VirtualDocument +import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory} import edu.uci.ics.amber.core.storage.result._ import edu.uci.ics.amber.core.tuple.Tuple import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan} @@ -19,7 +21,11 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregat import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.engine.common.executionruntimestate.ExecutionMetadataStore import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime} -import edu.uci.ics.amber.core.virtualidentity.{OperatorIdentity, WorkflowIdentity} +import edu.uci.ics.amber.core.virtualidentity.{ + ExecutionIdentity, + OperatorIdentity, + WorkflowIdentity +} import edu.uci.ics.amber.core.workflow.OutputPort.OutputMode import edu.uci.ics.amber.core.workflow.PortIdentity import edu.uci.ics.texera.web.SubscriptionManager @@ -29,7 +35,10 @@ import edu.uci.ics.texera.web.model.websocket.event.{ WebResultUpdateEvent } import edu.uci.ics.texera.web.model.websocket.request.ResultPaginationRequest +import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource +import edu.uci.ics.texera.web.service.WorkflowExecutionService.getLatestExecutionId import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore} +import org.jooq.types.UInteger import java.util.UUID import scala.collection.mutable @@ -60,6 +69,7 @@ object ExecutionResultService { */ private def convertWebResultUpdate( workflowIdentity: WorkflowIdentity, + executionId: ExecutionIdentity, physicalOps: List[PhysicalOp], oldTupleCount: Int, newTupleCount: Int @@ -85,10 +95,14 @@ object ExecutionResultService { } } - val storage = - ResultStorage - .getOpResultStorage(workflowIdentity) - .get(OpResultStorage.createStorageKey(physicalOps.head.id.logicalOpId, PortIdentity())) + val storageUri = VFSURIFactory.createResultURI( + workflowIdentity, + executionId, + physicalOps.head.id.logicalOpId, + PortIdentity() + ) + val storage: VirtualDocument[Tuple] = + DocumentFactory.openDocument(storageUri)._1.asInstanceOf[VirtualDocument[Tuple]] val webUpdate = webOutputMode match { case PaginationMode() => val numTuples = storage.getCount @@ -167,11 +181,11 @@ class ExecutionResultService( private var resultUpdateCancellable: Cancellable = _ def attachToExecution( + executionId: ExecutionIdentity, stateStore: ExecutionStateStore, physicalPlan: PhysicalPlan, client: AmberClient ): Unit = { - if (resultUpdateCancellable != null && !resultUpdateCancellable.isCancelled) { resultUpdateCancellable.cancel() } @@ -188,7 +202,7 @@ class ExecutionResultService( 2.seconds, resultPullingFrequency.seconds ) { - onResultUpdate(physicalPlan) + onResultUpdate(executionId, physicalPlan) } } } else { @@ -204,7 +218,7 @@ class ExecutionResultService( logger.info("Workflow execution terminated. Stop update results.") if (resultUpdateCancellable.cancel() || resultUpdateCancellable.isCancelled) { // immediately perform final update - onResultUpdate(physicalPlan) + onResultUpdate(executionId, physicalPlan) } } }) @@ -233,16 +247,20 @@ class ExecutionResultService( val oldInfo = oldState.resultInfo.getOrElse(opId, OperatorResultMetadata()) buf(opId.id) = ExecutionResultService.convertWebResultUpdate( workflowIdentity, + executionId, physicalPlan.getPhysicalOpsOfLogicalOp(opId), oldInfo.tupleCount, info.tupleCount ) if (StorageConfig.resultStorageMode == MONGODB) { // using the first port for now. TODO: support multiple ports - val storageKey = OpResultStorage.createStorageKey(opId, PortIdentity()) - val opStorage = ResultStorage - .getOpResultStorage(workflowIdentity) - .get(storageKey) + val storageUri = VFSURIFactory.createResultURI( + workflowIdentity, + executionId, + opId, + PortIdentity() + ) + val opStorage = DocumentFactory.openDocument(storageUri)._1 opStorage match { case mongoDocument: MongoDocument[Tuple] => val tableCatStats = mongoDocument.getCategoricalStats @@ -278,14 +296,21 @@ class ExecutionResultService( def handleResultPagination(request: ResultPaginationRequest): TexeraWebSocketEvent = { // calculate from index (pageIndex starts from 1 instead of 0) val from = request.pageSize * (request.pageIndex - 1) - + val latestExecutionId = getLatestExecutionId(workflowIdentity).getOrElse( + throw new IllegalStateException("No execution is recorded") + ) // using the first port for now. TODO: support multiple ports - val storageKey = - OpResultStorage.createStorageKey(OperatorIdentity(request.operatorID), PortIdentity()) + val storageUri = VFSURIFactory.createResultURI( + workflowIdentity, + latestExecutionId, + OperatorIdentity(request.operatorID), + PortIdentity() + ) val paginationIterable = { - ResultStorage - .getOpResultStorage(workflowIdentity) - .get(storageKey) + DocumentFactory + .openDocument(storageUri) + ._1 + .asInstanceOf[VirtualDocument[Tuple]] .getRange(from, from + request.pageSize) .to(Iterable) } @@ -298,26 +323,24 @@ class ExecutionResultService( PaginatedResultEvent.apply(request, mappedResults, attributes) } - private def onResultUpdate(physicalPlan: PhysicalPlan): Unit = { + private def onResultUpdate(executionId: ExecutionIdentity, physicalPlan: PhysicalPlan): Unit = { workflowStateStore.resultStore.updateState { _ => val newInfo: Map[OperatorIdentity, OperatorResultMetadata] = { - ResultStorage - .getOpResultStorage(workflowIdentity) - .getAllKeys - .filter(!_.startsWith("materialized_")) - .map(storageKey => { - val count = ResultStorage - .getOpResultStorage(workflowIdentity) - .get(storageKey) - .getCount - .toInt - - val (opId, storagePortId) = OpResultStorage.decodeStorageKey(storageKey) + ExecutionResourcesMapping + .getResourceURIs(executionId) + .filter(uri => { + val (_, _, _, _, resourceType) = VFSURIFactory.decodeURI(uri) + resourceType != MATERIALIZED_RESULT + }) + .map(uri => { + val count = DocumentFactory.openDocument(uri)._1.getCount.toInt + + val (_, _, opId, storagePortId, _) = VFSURIFactory.decodeURI(uri) // Retrieve the mode of the specified output port val mode = physicalPlan .getPhysicalOpsOfLogicalOp(opId) - .flatMap(_.outputPorts.get(storagePortId)) + .flatMap(_.outputPorts.get(storagePortId.get)) .map(_._1.mode) .head diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala index f57eb29fc79..078efbb2c93 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala @@ -7,8 +7,9 @@ import com.google.api.services.drive.Drive import com.google.api.services.drive.model.{File, FileList, Permission} import com.google.api.services.sheets.v4.Sheets import com.google.api.services.sheets.v4.model.{Spreadsheet, SpreadsheetProperties, ValueRange} +import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} +import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT import edu.uci.ics.amber.core.storage.model.VirtualDocument -import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} import edu.uci.ics.amber.core.tuple.Tuple import edu.uci.ics.amber.engine.common.Utils.retry import edu.uci.ics.amber.util.PathUtils @@ -22,6 +23,7 @@ import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowVersionRe import org.jooq.types.UInteger import edu.uci.ics.amber.util.ArrowUtils import edu.uci.ics.amber.core.workflow.PortIdentity +import edu.uci.ics.texera.web.service.WorkflowExecutionService.getLatestExecutionId import java.io.{PipedInputStream, PipedOutputStream} import java.nio.charset.StandardCharsets @@ -56,7 +58,6 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) { import ResultExportService._ private val cache = new mutable.HashMap[String, String] - def exportResult( user: User, request: ResultExportRequest @@ -71,11 +72,17 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) { // By now the workflow should finish running // Only supports external port 0 for now. TODO: support multiple ports + val storageUri = VFSURIFactory.createResultURI( + workflowIdentity, + getLatestExecutionId(workflowIdentity).getOrElse( + return ResultExportResponse("error", "The workflow contains no results") + ), + OperatorIdentity(request.operatorId), + PortIdentity() + ) val operatorResult: VirtualDocument[Tuple] = - ResultStorage - .getOpResultStorage(workflowIdentity) - .get(OpResultStorage.createStorageKey(OperatorIdentity(request.operatorId), PortIdentity())) - if (operatorResult == null) { + DocumentFactory.openDocument(storageUri)._1.asInstanceOf[VirtualDocument[Tuple]] + if (operatorResult.getCount == 0) { return ResultExportResponse("error", "The workflow contains no results") } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala index e24fbad4199..fb9a666dd3b 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala @@ -1,7 +1,9 @@ package edu.uci.ics.texera.web.service import com.typesafe.scalalogging.LazyLogging +import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} import edu.uci.ics.amber.core.workflow.WorkflowContext +import edu.uci.ics.amber.core.workflow.WorkflowContext.DEFAULT_EXECUTION_ID import edu.uci.ics.amber.engine.architecture.controller.{ControllerConfig, Workflow} import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState._ @@ -26,6 +28,17 @@ import java.net.URI import java.util import scala.collection.mutable +object WorkflowExecutionService { + def getLatestExecutionId(workflowId: WorkflowIdentity): Option[ExecutionIdentity] = { + if (!AmberConfig.isUserSystemEnabled) { + return Some(DEFAULT_EXECUTION_ID) + } + WorkflowExecutionsResource + .getLatestExecutionID(UInteger.valueOf(workflowId.id)) + .map(eid => new ExecutionIdentity(eid.longValue())) + } +} + class WorkflowExecutionService( controllerConfig: ControllerConfig, val workflowContext: WorkflowContext, @@ -125,7 +138,12 @@ class WorkflowExecutionService( executionConsoleService = new ExecutionConsoleService(client, executionStateStore, wsInput) logger.info("Starting the workflow execution.") - resultService.attachToExecution(executionStateStore, workflow.physicalPlan, client) + resultService.attachToExecution( + workflowContext.executionId, + executionStateStore, + workflow.physicalPlan, + client + ) executionStateStore.metadataStore.updateState(metadataStore => updateWorkflowState(READY, metadataStore) .withFatalErrors(Seq.empty) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala index 9610e00c3a4..6aa32d84d92 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala @@ -3,7 +3,8 @@ package edu.uci.ics.texera.web.service import com.google.protobuf.timestamp.Timestamp import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.WorkflowRuntimeException -import edu.uci.ics.amber.core.storage.result.ResultStorage +import edu.uci.ics.amber.core.storage.DocumentFactory +import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping import edu.uci.ics.amber.core.workflow.WorkflowContext import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{ @@ -21,11 +22,13 @@ import edu.uci.ics.amber.core.virtualidentity.{ ExecutionIdentity, WorkflowIdentity } +import edu.uci.ics.amber.core.workflow.WorkflowContext.DEFAULT_EXECUTION_ID import edu.uci.ics.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE import edu.uci.ics.amber.core.workflowruntimestate.WorkflowFatalError import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User import edu.uci.ics.texera.web.model.websocket.event.TexeraWebSocketEvent import edu.uci.ics.texera.web.model.websocket.request.WorkflowExecuteRequest +import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource import edu.uci.ics.texera.web.service.WorkflowService.mkWorkflowStateId import edu.uci.ics.texera.web.storage.ExecutionStateStore.updateWorkflowState import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore} @@ -33,6 +36,7 @@ import edu.uci.ics.texera.web.{SubscriptionManager, WorkflowLifecycleManager} import edu.uci.ics.texera.workflow.LogicalPlan import io.reactivex.rxjava3.disposables.{CompositeDisposable, Disposable} import io.reactivex.rxjava3.subjects.BehaviorSubject +import org.jooq.types.UInteger import play.api.libs.json.Json import java.net.URI @@ -72,6 +76,7 @@ class WorkflowService( cleanUpTimeout: Int ) extends SubscriptionManager with LazyLogging { + // state across execution: private val errorSubject = BehaviorSubject.create[TexeraWebSocketEvent]().toSerialized val stateStore = new WorkflowStateStore() @@ -83,7 +88,20 @@ class WorkflowService( s"workflowId=$workflowId", cleanUpTimeout, () => { - ResultStorage.getOpResultStorage(workflowId).clear() + // clear the storage resources associated with the latest execution + WorkflowExecutionService + .getLatestExecutionId(workflowId) + .foreach(eid => { + ExecutionResourcesMapping + .getResourceURIs(eid) + .foreach(uri => + try { + DocumentFactory.openDocument(uri)._1.clear() + } catch { + case _: Throwable => // exception can be raised if the document is already cleared + } + ) + }) WorkflowService.workflowServiceMapping.remove(mkWorkflowStateId(workflowId)) if (executionService.getValue != null) { // shutdown client @@ -155,6 +173,20 @@ class WorkflowService( val workflowContext: WorkflowContext = createWorkflowContext() var controllerConf = ControllerConfig.default + // clean up results from previous run + val previousExecutionId = WorkflowExecutionService.getLatestExecutionId(workflowId) + previousExecutionId.foreach(eid => { + ExecutionResourcesMapping + .getResourceURIs(eid) + .foreach(uri => + try { + DocumentFactory.openDocument(uri)._1.clear() + } catch { // exception can happen if the resource is already cleared + case _: Throwable => + } + ) + }) // TODO: change this behavior after enabling cache. + workflowContext.executionId = ExecutionsMetadataPersistService.insertNewExecution( workflowContext.workflowId, uidOpt, @@ -225,11 +257,6 @@ class WorkflowService( } } } - - // clean up results from previous run - ResultStorage - .getOpResultStorage(workflowId) - .clear() // TODO: change this behavior after enabling cache. try { val execution = new WorkflowExecutionService( controllerConf, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index 493015de005..f811dee6ae7 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -1,7 +1,9 @@ package edu.uci.ics.texera.workflow import com.typesafe.scalalogging.LazyLogging -import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} +import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT +import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory} +import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.Workflow import edu.uci.ics.amber.engine.common.Utils.objectMapper @@ -76,40 +78,43 @@ class WorkflowCompiler( .filterNot(_._1.internal) .foreach { case (outputPortId, (outputPort, _, schema)) => - val storage = ResultStorage.getOpResultStorage(context.workflowId) - val storageKey = - OpResultStorage.createStorageKey(physicalOp.id.logicalOpId, outputPortId) + val storageUri = VFSURIFactory.createResultURI( + context.workflowId, + context.executionId, + physicalOp.id.logicalOpId, + outputPortId + ) // Determine the storage type, defaulting to iceberg for large HTML visualizations val storageType = - if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.ICEBERG - else OpResultStorage.defaultStorageMode - - if (!storage.contains(storageKey)) { + if (outputPort.mode == SINGLE_SNAPSHOT) DocumentFactory.ICEBERG + else StorageConfig.resultStorageMode + + if ( + !ExecutionResourcesMapping + .getResourceURIs(context.executionId) + .contains(storageUri) + ) { // Create storage if it doesn't exist val sinkStorageSchema = schema.getOrElse(throw new IllegalStateException("Schema is missing")) - storage.create( - s"${context.executionId}_", - storageKey, - storageType, - sinkStorageSchema - ) + + // create the storage resource and record the URI to the global mapping + DocumentFactory.createDocument(storageUri, sinkStorageSchema) + ExecutionResourcesMapping.addResourceUri(context.executionId, storageUri) // Add sink collection name to the JSON array of sinks sinksPointers.add( objectMapper .createObjectNode() .put("storageType", storageType) - .put("storageKey", s"${context.executionId}_$storageKey") + .put("storageKey", storageUri.toString) ) } // Create and link the sink operator val sinkPhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp( - context.workflowId, - context.executionId, - storageKey, + storageUri, outputPort.mode ) val sinkLink = PhysicalLink( diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/BatchSizePropagationSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/BatchSizePropagationSpec.scala index 9eca6949048..e29d4e1c7f5 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/BatchSizePropagationSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/BatchSizePropagationSpec.scala @@ -4,7 +4,6 @@ import akka.actor.{ActorSystem, Props} import akka.testkit.{ImplicitSender, TestKit} import akka.util.Timeout import edu.uci.ics.amber.clustering.SingleNodeListener -import edu.uci.ics.amber.core.storage.result.OpResultStorage import edu.uci.ics.amber.core.workflow.{WorkflowContext, WorkflowSettings} import edu.uci.ics.amber.engine.architecture.controller._ import edu.uci.ics.amber.engine.architecture.sendsemantics.partitionings._ @@ -28,15 +27,12 @@ class BatchSizePropagationSpec implicit val timeout: Timeout = Timeout(5.seconds) - val resultStorage = new OpResultStorage() - override def beforeAll(): Unit = { system.actorOf(Props[SingleNodeListener](), "cluster-info") } override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) - resultStorage.clear() } def verifyBatchSizeInPartitioning( diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 9ef3a1ae101..81decb71c56 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -7,7 +7,10 @@ import akka.util.Timeout import ch.vorburger.mariadb4j.DB import com.twitter.util.{Await, Duration, Promise} import edu.uci.ics.amber.clustering.SingleNodeListener -import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} +import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} +import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT +import edu.uci.ics.amber.core.storage.model.VirtualDocument +import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple} import edu.uci.ics.amber.core.workflow.WorkflowContext import edu.uci.ics.amber.engine.architecture.controller._ @@ -38,7 +41,6 @@ class DataProcessingSpec var inMemoryMySQLInstance: Option[DB] = None val workflowContext: WorkflowContext = new WorkflowContext() - val resultStorage: OpResultStorage = ResultStorage.getOpResultStorage(workflowContext.workflowId) override def beforeAll(): Unit = { system.actorOf(Props[SingleNodeListener](), "cluster-info") @@ -47,7 +49,6 @@ class DataProcessingSpec override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) - resultStorage.clear() } def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] = { @@ -69,16 +70,32 @@ class DataProcessingSpec .registerCallback[ExecutionStateUpdate](evt => { if (evt.state == COMPLETED) { results = workflow.logicalPlan.getTerminalOperatorIds - .filter(terminalOpId => + .filter(terminalOpId => { + val uri = VFSURIFactory.createResultURI( + workflowContext.workflowId, + workflowContext.executionId, + terminalOpId, + PortIdentity() + ) // expecting the first output port only. - resultStorage.contains(OpResultStorage.createStorageKey(terminalOpId, PortIdentity())) - ) - .map(terminalOpId => - terminalOpId -> resultStorage - .get(OpResultStorage.createStorageKey(terminalOpId, PortIdentity())) + ExecutionResourcesMapping + .getResourceURIs(workflowContext.executionId) + .contains(uri) + }) + .map(terminalOpId => { + val uri = VFSURIFactory.createResultURI( + workflowContext.workflowId, + workflowContext.executionId, + terminalOpId, + PortIdentity() + ) + terminalOpId -> DocumentFactory + .openDocument(uri) + ._1 + .asInstanceOf[VirtualDocument[Tuple]] .get() .toList - ) + }) .toMap completion.setDone() } diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index c1b0f319c50..53a8a9b3008 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -7,7 +7,6 @@ import akka.util.Timeout import com.twitter.util.{Await, Promise} import com.typesafe.scalalogging.Logger import edu.uci.ics.amber.clustering.SingleNodeListener -import edu.uci.ics.amber.core.storage.result.OpResultStorage import edu.uci.ics.amber.core.workflow.WorkflowContext import edu.uci.ics.amber.engine.architecture.controller.{ControllerConfig, ExecutionStateUpdate} import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest @@ -45,7 +44,6 @@ class PauseSpec operators: List[LogicalOp], links: List[LogicalLink] ): Unit = { - val resultStorage = new OpResultStorage() val workflow = TestUtils.buildWorkflow(operators, links, new WorkflowContext()) val client = new AmberClient( diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/CheckpointSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/CheckpointSpec.scala index cd423741b10..9d7dfa085e4 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/CheckpointSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/CheckpointSpec.scala @@ -3,7 +3,6 @@ package edu.uci.ics.amber.engine.faulttolerance import akka.actor.{ActorSystem, Props} import akka.serialization.SerializationExtension import edu.uci.ics.amber.clustering.SingleNodeListener -import edu.uci.ics.amber.core.storage.result.OpResultStorage import edu.uci.ics.amber.core.workflow.WorkflowContext import edu.uci.ics.amber.engine.architecture.controller.{ControllerConfig, ControllerProcessor} import edu.uci.ics.amber.engine.architecture.worker.DataProcessor @@ -21,7 +20,6 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll { var system: ActorSystem = _ - val resultStorage = new OpResultStorage() val csvOpDesc = TestOperators.mediumCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") val workflow = buildWorkflow( diff --git a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/common/storage/ReadonlyLocalFileDocumentSpec.scala b/core/amber/src/test/scala/edu/uci/ics/texera/workflow/common/storage/ReadonlyLocalFileDocumentSpec.scala index c3ed0d32cd2..4927695868b 100644 --- a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/common/storage/ReadonlyLocalFileDocumentSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/texera/workflow/common/storage/ReadonlyLocalFileDocumentSpec.scala @@ -1,6 +1,7 @@ package edu.uci.ics.amber.engine.common.storage -import edu.uci.ics.amber.core.storage.model.ReadonlyLocalFileDocument +import edu.uci.ics.amber.core.storage.DocumentFactory +import edu.uci.ics.amber.core.storage.model.{ReadonlyLocalFileDocument, ReadonlyVirtualDocument} import org.scalatest.BeforeAndAfter import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -12,7 +13,7 @@ import scala.util.Using class ReadonlyLocalFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeAndAfter { var tempFile: File = _ - var fileDocument: ReadonlyLocalFileDocument = _ + var fileDocument: ReadonlyVirtualDocument[_] = _ val initialContent = "Initial Content\nsome more content to make the text longer\nadf\t\ttestteset" @@ -21,7 +22,7 @@ class ReadonlyLocalFileDocumentSpec extends AnyFlatSpec with Matchers with Befor // Create a temporary file with initial content tempFile = File.createTempFile("test", ".txt") Files.write(tempFile.toPath, initialContent.getBytes) - fileDocument = new ReadonlyLocalFileDocument(tempFile.toURI) + fileDocument = DocumentFactory.openReadonlyDocument(tempFile.toURI) } after { diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/DocumentFactory.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/DocumentFactory.scala index 27fa25ec116..e38f8ee9c86 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/DocumentFactory.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/DocumentFactory.scala @@ -3,24 +3,166 @@ package edu.uci.ics.amber.core.storage import edu.uci.ics.amber.core.storage.model.{ DatasetFileDocument, ReadonlyLocalFileDocument, - ReadonlyVirtualDocument + ReadonlyVirtualDocument, + VirtualDocument } import FileResolver.DATASET_FILE_URI_SCHEME +import edu.uci.ics.amber.core.storage.VFSResourceType.{MATERIALIZED_RESULT, RESULT} +import edu.uci.ics.amber.core.storage.VFSURIFactory.{VFS_FILE_URI_SCHEME, decodeURI} +import edu.uci.ics.amber.core.storage.result.iceberg.IcebergDocument +import edu.uci.ics.amber.core.tuple.{Schema, Tuple} +import edu.uci.ics.amber.util.IcebergUtil +import org.apache.iceberg.data.Record +import org.apache.iceberg.{Schema => IcebergSchema} import java.net.URI object DocumentFactory { - def newReadonlyDocument(fileUri: URI): ReadonlyVirtualDocument[_] = { + + val MONGODB: String = "mongodb" + val ICEBERG: String = "iceberg" + + private def sanitizeURIPath(uri: URI): String = uri.getPath.stripPrefix("/").replace("/", "_") + + /** + * Open a document specified by the uri for read purposes only. + * @param fileUri the uri of the document + * @return ReadonlyVirtualDocument + */ + def openReadonlyDocument(fileUri: URI): ReadonlyVirtualDocument[_] = { fileUri.getScheme match { case DATASET_FILE_URI_SCHEME => new DatasetFileDocument(fileUri) case "file" => - // For local files, create a ReadonlyLocalFileDocument new ReadonlyLocalFileDocument(fileUri) case _ => - throw new UnsupportedOperationException(s"Unsupported URI scheme: ${fileUri.getScheme}") + throw new UnsupportedOperationException( + s"Unsupported URI scheme: ${fileUri.getScheme} for creating the ReadonlyDocument" + ) + } + } + + /** + * Create a document for storage specified by the uri. + * This document is suitable for storing structural data, i.e. the schema is required to create such document. + * @param uri the location of the document + * @param schema the schema of the data stored in the document + * @return the created document + */ + def createDocument(uri: URI, schema: Schema): VirtualDocument[_] = { + uri.getScheme match { + case VFS_FILE_URI_SCHEME => + val (_, _, _, _, resourceType) = decodeURI(uri) + + resourceType match { + case RESULT | MATERIALIZED_RESULT => + val storageKey = sanitizeURIPath(uri) + + StorageConfig.resultStorageMode.toLowerCase match { + case ICEBERG => + val icebergSchema = IcebergUtil.toIcebergSchema(schema) + IcebergUtil.createTable( + IcebergCatalogInstance.getInstance(), + StorageConfig.icebergTableNamespace, + storageKey, + icebergSchema, + overrideIfExists = true + ) + val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord + val deserde: (IcebergSchema, Record) => Tuple = (_, record) => + IcebergUtil.fromRecord(record, schema) + + new IcebergDocument[Tuple]( + StorageConfig.icebergTableNamespace, + storageKey, + icebergSchema, + serde, + deserde + ) + + case _ => + throw new IllegalArgumentException( + s"Storage mode '${StorageConfig.resultStorageMode}' is not supported" + ) + } + + case _ => + throw new IllegalArgumentException( + s"Resource type $resourceType is not supported" + ) + } + + case _ => + throw new UnsupportedOperationException( + s"Unsupported URI scheme: ${uri.getScheme} for creating the document" + ) + } + } + + /** + * Open a document specified by the uri. + * If the document is storing structural data, the schema will also be returned + * @param uri the uri of the document + * @return the VirtualDocument, which is the handler of the data; the Schema, which is the schema of the data stored in the document + */ + def openDocument(uri: URI): (VirtualDocument[_], Option[Schema]) = { + uri.getScheme match { + case DATASET_FILE_URI_SCHEME => + (new DatasetFileDocument(uri), None) + + case VFS_FILE_URI_SCHEME => + val (_, _, _, _, resourceType) = decodeURI(uri) + + resourceType match { + case RESULT | MATERIALIZED_RESULT => + val storageKey = sanitizeURIPath(uri) + + StorageConfig.resultStorageMode.toLowerCase match { + case ICEBERG => + val table = IcebergUtil + .loadTableMetadata( + IcebergCatalogInstance.getInstance(), + StorageConfig.icebergTableNamespace, + storageKey + ) + .getOrElse( + throw new IllegalArgumentException("No storage is found for the given URI") + ) + + val amberSchema = IcebergUtil.fromIcebergSchema(table.schema()) + val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord + val deserde: (IcebergSchema, Record) => Tuple = (_, record) => + IcebergUtil.fromRecord(record, amberSchema) + + ( + new IcebergDocument[Tuple]( + StorageConfig.icebergTableNamespace, + storageKey, + table.schema(), + serde, + deserde + ), + Some(amberSchema) + ) + + case _ => + throw new IllegalArgumentException( + s"Storage mode '${StorageConfig.resultStorageMode}' is not supported" + ) + } + + case _ => + throw new IllegalArgumentException( + s"Resource type $resourceType is not supported" + ) + } + + case _ => + throw new UnsupportedOperationException( + s"Unsupported URI scheme: ${uri.getScheme} for opening the document" + ) } } } diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala index c90707a77a8..fcb391cba7f 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala @@ -1,5 +1,12 @@ package edu.uci.ics.amber.core.storage +import edu.uci.ics.amber.core.storage.FileResolver.DATASET_FILE_URI_SCHEME +import edu.uci.ics.amber.core.virtualidentity.{ + ExecutionIdentity, + OperatorIdentity, + WorkflowIdentity +} +import edu.uci.ics.amber.core.workflow.PortIdentity import edu.uci.ics.texera.dao.SqlServer import edu.uci.ics.texera.dao.SqlServer.withTransaction import edu.uci.ics.texera.dao.jooq.generated.tables.Dataset.DATASET @@ -14,15 +21,24 @@ import java.nio.file.{Files, Paths} import scala.jdk.CollectionConverters.IteratorHasAsScala import scala.util.{Success, Try} +object VFSResourceType extends Enumeration { + val RESULT: Value = Value("result") + val MATERIALIZED_RESULT: Value = Value("materializedResult") +} + +/** + * Unified object for resolving both VFS resources and local/dataset files. + */ object FileResolver { - val DATASET_FILE_URI_SCHEME = "vfs" + + val DATASET_FILE_URI_SCHEME = "dataset" /** - * Attempts to resolve the given fileName using a list of resolver functions. + * Resolves a given fileName to either a file on the local file system or a dataset file. * - * @param fileName the name of the file to resolve - * @throws FileNotFoundException if the file cannot be resolved by any resolver - * @return Either[String, DatasetFileDocument] - the resolved path as a String or a DatasetFileDocument + * @param fileName the name of the file to resolve. + * @throws FileNotFoundException if the file cannot be resolved. + * @return A URI pointing to the resolved file. */ def resolve(fileName: String): URI = { if (isFileResolved(fileName)) { @@ -58,7 +74,7 @@ object FileResolver { * The fileName format should be: /ownerEmail/datasetName/versionName/fileRelativePath * e.g. /bob@texera.com/twitterDataset/v1/california/irvine/tw1.csv * The output dataset URI format is: {DATASET_FILE_URI_SCHEME}:///{did}/{versionHash}/file-path - * e.g. vfs:///15/adeq233td/some/dir/file.txt + * e.g. {DATASET_FILE_URI_SCHEME}:///15/adeq233td/some/dir/file.txt * * @param fileName the name of the file to attempt resolving as a DatasetFileDocument * @return Either[String, DatasetFileDocument] - Right(document) if creation succeeds diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/VFSURIFactory.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/VFSURIFactory.scala new file mode 100644 index 00000000000..677365285a6 --- /dev/null +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/VFSURIFactory.scala @@ -0,0 +1,147 @@ +package edu.uci.ics.amber.core.storage + +import edu.uci.ics.amber.core.storage.VFSResourceType.{MATERIALIZED_RESULT, RESULT} +import edu.uci.ics.amber.core.virtualidentity.{ + ExecutionIdentity, + OperatorIdentity, + WorkflowIdentity +} +import edu.uci.ics.amber.core.workflow.PortIdentity + +import java.net.URI + +object VFSURIFactory { + val VFS_FILE_URI_SCHEME = "vfs" + + /** + * Parses a VFS URI and extracts its components + * + * @param uri The VFS URI to parse. + * @return A `VFSUriComponents` object with the extracted data. + * @throws IllegalArgumentException if the URI is malformed. + */ + def decodeURI(uri: URI): ( + WorkflowIdentity, + ExecutionIdentity, + OperatorIdentity, + Option[PortIdentity], + VFSResourceType.Value + ) = { + if (uri.getScheme != VFS_FILE_URI_SCHEME) { + throw new IllegalArgumentException(s"Invalid URI scheme: ${uri.getScheme}") + } + + val segments = uri.getPath.stripPrefix("/").split("/").toList + + def extractValue(key: String): String = { + val index = segments.indexOf(key) + if (index == -1 || index + 1 >= segments.length) { + throw new IllegalArgumentException(s"Missing value for key: $key in URI: $uri") + } + segments(index + 1) + } + + val workflowId = WorkflowIdentity(extractValue("wid").toLong) + val executionId = ExecutionIdentity(extractValue("eid").toLong) + val operatorId = OperatorIdentity(extractValue("opid")) + + val portIdentity: Option[PortIdentity] = segments.indexOf("pid") match { + case -1 => None + case idx if idx + 1 < segments.length => + val Array(portIdStr, portType) = segments(idx + 1).split("_") + val portId = portIdStr.toInt + val isInternal = portType match { + case "I" => true + case "E" => false + case _ => throw new IllegalArgumentException(s"Invalid port type: $portType in URI: $uri") + } + Some(PortIdentity(portId, isInternal)) + case _ => + throw new IllegalArgumentException(s"Invalid port information in URI: $uri") + } + + val resourceTypeStr = segments.last.toLowerCase + val resourceType = VFSResourceType.values + .find(_.toString.toLowerCase == resourceTypeStr) + .getOrElse(throw new IllegalArgumentException(s"Unknown resource type: $resourceTypeStr")) + + (workflowId, executionId, operatorId, portIdentity, resourceType) + } + + /** + * Create a URI pointing to a result storage + */ + def createResultURI( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity, + operatorId: OperatorIdentity, + portIdentity: PortIdentity + ): URI = { + createVFSURI( + RESULT, + workflowId, + executionId, + operatorId, + Some(portIdentity) + ) + } + + /** + * Create a URI pointing to a materialized storage + */ + def createMaterializedResultURI( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity, + operatorId: OperatorIdentity, + portIdentity: PortIdentity + ): URI = { + createVFSURI( + MATERIALIZED_RESULT, + workflowId, + executionId, + operatorId, + Some(portIdentity) + ) + } + + /** + * Internal helper to create URI pointing to a VFS resource. The URI can be used by the DocumentFactory to create resource or open resource + * + * @param resourceType The type of the VFS resource. + * @param workflowId Workflow identifier. + * @param executionId Execution identifier. + * @param operatorId Operator identifier. + * @param portIdentity Optional port identifier. **Required** if `resourceType` is `RESULT` or `MATERIALIZED_RESULT`. + * @return A VFS URI + * @throws IllegalArgumentException if `resourceType` is `RESULT` but `portIdentity` is missing. + */ + private def createVFSURI( + resourceType: VFSResourceType.Value, + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity, + operatorId: OperatorIdentity, + portIdentity: Option[PortIdentity] = None + ): URI = { + + if ( + (resourceType == VFSResourceType.RESULT || resourceType == VFSResourceType.MATERIALIZED_RESULT) && portIdentity.isEmpty + ) { + throw new IllegalArgumentException( + "PortIdentity must be provided when resourceType is RESULT or MATERIALIZED_RESULT." + ) + } + + val baseUri = + s"$VFS_FILE_URI_SCHEME:///wid/${workflowId.id}/eid/${executionId.id}/opid/${operatorId.id}" + + val uriWithPort = portIdentity match { + case Some(port) => + val portType = if (port.internal) "I" else "E" + s"$baseUri/pid/${port.id}_$portType" + case None => + baseUri + } + + new URI(s"$uriWithPort/${resourceType.toString.toLowerCase}") + } +} diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/DatasetFileDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/DatasetFileDocument.scala index 8ad3ffe3102..d050915cc7c 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/DatasetFileDocument.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/DatasetFileDocument.scala @@ -10,7 +10,7 @@ import java.nio.charset.StandardCharsets import java.nio.file.{Files, Path, Paths} import scala.jdk.CollectionConverters.IteratorHasAsScala -class DatasetFileDocument(uri: URI) extends VirtualDocument[Nothing] { +private[storage] class DatasetFileDocument(uri: URI) extends VirtualDocument[Nothing] { // Utility function to parse and decode URI segments into individual components private def parseUri(uri: URI): (Int, String, Path) = { val segments = Paths.get(uri.getPath).iterator().asScala.map(_.toString).toArray diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/ReadonlyLocalFileDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/ReadonlyLocalFileDocument.scala index 652184fb420..7348db75c0c 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/ReadonlyLocalFileDocument.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/ReadonlyLocalFileDocument.scala @@ -7,7 +7,8 @@ import java.net.URI * ReadonlyLocalFileDocument provides a read-only abstraction over a local file. * The data type T is not required, as all iterator-related methods are unsupported */ -class ReadonlyLocalFileDocument(uri: URI) extends ReadonlyVirtualDocument[Nothing] { +private[storage] class ReadonlyLocalFileDocument(uri: URI) + extends ReadonlyVirtualDocument[Nothing] { /** * Get the URI of the corresponding document. diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala new file mode 100644 index 00000000000..511c39e41e2 --- /dev/null +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala @@ -0,0 +1,46 @@ +package edu.uci.ics.amber.core.storage.result + +import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} + +import java.net.URI +import scala.collection.mutable + +/** + * ExecutionResourcesMapping is a singleton for keeping track of resources associated with each execution. + * It maintains a mapping from execution ID to a list of URIs, which point to resources like the result storage. + * + * Currently, this mapping is only used during the resource clean-up phase. + * + * This design has one limitation: the singleton is only accessible on the master node. Consequently, all sink executors + * must execute on the master node. While this aligns with the current system design, improvements are needed in the + * future to enhance scalability and flexibility. + * + * TODO: Move the mappings to an external, distributed, and persistent location to eliminate the master-node + * dependency and enable sink executors to run on other nodes. + */ +object ExecutionResourcesMapping { + + private val executionIdToExecutionResourcesMapping: mutable.Map[ExecutionIdentity, List[URI]] = + mutable.Map.empty + + /** + * Get the URIs of given execution Id + * @param executionIdentity the target execution id + * @return + */ + def getResourceURIs(executionIdentity: ExecutionIdentity): List[URI] = { + executionIdToExecutionResourcesMapping.getOrElseUpdate(executionIdentity, List()) + } + + /** + * Add the URI to the mapping + * @param executionIdentity the target execution + * @param uri the URI of the resource + */ + def addResourceUri(executionIdentity: ExecutionIdentity, uri: URI): Unit = { + executionIdToExecutionResourcesMapping.updateWith(executionIdentity) { + case Some(existingUris) => Some(uri :: existingUris) // Prepend URI to the existing list + case None => Some(List(uri)) // Create a new list if key doesn't exist + } + } +} diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala index 9cc2f5c2860..3cd5008cd13 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala @@ -18,6 +18,7 @@ import java.util.Date * @param fromDocument a function that converts a MongoDB Document to an item of type T. * @tparam T the type of data items stored in the document. */ +@Deprecated class MongoDocument[T >: Null <: AnyRef]( id: String, var toDocument: T => Document, diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala deleted file mode 100644 index 52e916be331..00000000000 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala +++ /dev/null @@ -1,182 +0,0 @@ -package edu.uci.ics.amber.core.storage.result - -import com.typesafe.scalalogging.LazyLogging -import edu.uci.ics.amber.core.storage.StorageConfig -import edu.uci.ics.amber.core.storage.model.VirtualDocument -import edu.uci.ics.amber.core.storage.result.iceberg.IcebergDocument -import edu.uci.ics.amber.core.tuple.{Schema, Tuple} -import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity -import edu.uci.ics.amber.core.workflow.PortIdentity -import edu.uci.ics.amber.util.IcebergUtil -import org.apache.iceberg.data.Record -import org.apache.iceberg.{Schema => IcebergSchema} - -import java.util.concurrent.ConcurrentHashMap -import scala.jdk.CollectionConverters.IteratorHasAsScala - -/** - * Companion object for `OpResultStorage`, providing utility functions - * for key generation, decoding, and storage modes. - */ -object OpResultStorage { - val defaultStorageMode: String = StorageConfig.resultStorageMode.toLowerCase - val MONGODB: String = "mongodb" - val ICEBERG = "iceberg" - - /** - * Creates a unique storage key by combining operator and port identities. - * - * @param operatorId The unique identifier of the operator. - * @param portIdentity The unique identifier of the port. - * @param isMaterialized Indicates whether the storage is materialized (e.g., persisted). - * @return A string representing the generated storage key, formatted as: - * "materialized___" if materialized, - * otherwise "__". - */ - def createStorageKey( - operatorId: OperatorIdentity, - portIdentity: PortIdentity, - isMaterialized: Boolean = false - ): String = { - val prefix = if (isMaterialized) "materialized_" else "" - s"$prefix${operatorId.id}_${portIdentity.id}_${portIdentity.internal}" - } - - /** - * Decodes a storage key back into its original components. - * - * @param key The storage key to decode. - * @return A tuple containing the operator identity and port identity. - * @throws IllegalArgumentException If the key format is invalid. - */ - def decodeStorageKey(key: String): (OperatorIdentity, PortIdentity) = { - val processedKey = if (key.startsWith("materialized_")) key.substring(13) else key - processedKey.split("_", 3) match { - case Array(opId, portId, internal) => - (OperatorIdentity(opId), PortIdentity(portId.toInt, internal.toBoolean)) - case _ => - throw new IllegalArgumentException(s"Invalid storage key: $key") - } - } -} - -/** - * Handles the storage of operator results during workflow execution. - * Each `OpResultStorage` instance is tied to the lifecycle of a single execution. - */ -class OpResultStorage extends Serializable with LazyLogging { - - /** - * In-memory cache for storing results and their associated schemas. - * TODO: Once the storage is self-contained (i.e., stores schemas as metadata), - * this can be removed. - */ - private val cache: ConcurrentHashMap[String, (VirtualDocument[Tuple], Schema)] = - new ConcurrentHashMap() - - /** - * Retrieves the result of an operator from the storage. - * - * @param key The storage key associated with the result. - * @return The result stored as a `VirtualDocument[Tuple]`. - * @throws NoSuchElementException If the key is not found in the cache. - */ - def get(key: String): VirtualDocument[Tuple] = { - Option(cache.get(key)) match { - case Some((document, _)) => document - case None => throw new NoSuchElementException(s"Storage with key $key not found") - } - } - - /** - * Retrieves the schema associated with an operator's result. - * - * @param key The storage key associated with the schema. - * @return The schema of the result. - */ - def getSchema(key: String): Schema = { - cache.get(key)._2 - } - - /** - * Creates a new storage object for an operator result. - * - * @param executionId An optional execution ID for unique identification. - * @param key The storage key for the result. - * @param mode The storage mode (e.g., "memory" or "mongodb"). - * @param schema The schema of the result. - * @return A `VirtualDocument[Tuple]` instance for storing results. - */ - def create( - executionId: String = "", - key: String, - mode: String, - schema: Schema - ): VirtualDocument[Tuple] = { - val storage: VirtualDocument[Tuple] = - if (mode == OpResultStorage.MONGODB) { - try { - new MongoDocument[Tuple]( - executionId + key, - Tuple.toDocument, - Tuple.fromDocument(schema) - ) - } catch { - case t: Throwable => - logger.warn("Failed to create MongoDB storage", t) - logger.info(s"Falling back to Iceberg storage for $key") - createIcebergDocument(executionId, key, schema) - } - } else if (mode == OpResultStorage.ICEBERG) { - createIcebergDocument(executionId, key, schema) - } else { - throw new IllegalArgumentException(s"Unsupported storage mode: $mode") - } - cache.put(key, (storage, schema)) - storage - } - - /** - * Checks if a storage key exists in the cache. - * - * @param key The storage key to check. - * @return True if the key exists, false otherwise. - */ - def contains(key: String): Boolean = cache.containsKey(key) - - /** - * Clears all stored results. Typically used during workflow cleanup. - */ - def clear(): Unit = { - cache.forEach((_, document) => document._1.clear()) - cache.clear() - } - - /** - * Retrieves all storage keys currently in the cache. - * - * @return A set of all keys in the cache. - */ - def getAllKeys: Set[String] = { - cache.keySet().iterator().asScala.toSet - } - - private def createIcebergDocument( - executionId: String, - key: String, - schema: Schema - ): IcebergDocument[Tuple] = { - val icebergSchema = IcebergUtil.toIcebergSchema(schema) - val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord - val deserde: (IcebergSchema, Record) => Tuple = (_, record) => - IcebergUtil.fromRecord(record, schema) - - new IcebergDocument[Tuple]( - StorageConfig.icebergTableNamespace, - executionId + key, - icebergSchema, - serde, - deserde - ) - } -} diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ResultStorage.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ResultStorage.scala deleted file mode 100644 index 701ee9c01fe..00000000000 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ResultStorage.scala +++ /dev/null @@ -1,27 +0,0 @@ -package edu.uci.ics.amber.core.storage.result - -import edu.uci.ics.amber.core.virtualidentity.WorkflowIdentity - -import scala.collection.mutable - -/** - * ResultStorage is a singleton for accessing storage objects. It maintains a mapping from workflow ID to OpResultStorage. - * - * Using a workflow ID and an operator ID, the corresponding OpResultStorage object can be resolved and retrieved globally. - * - * This design has one limitation: the singleton is only accessible on the master node. Consequently, all sink executors - * must execute on the master node. While this aligns with the current system design, improvements are needed in the - * future to enhance scalability and flexibility. - * - * TODO: Move the storage mappings to an external, distributed, and persistent location to eliminate the master-node - * dependency and enable sink executors to run on other nodes. - */ -object ResultStorage { - - private val workflowIdToOpResultMapping: mutable.Map[WorkflowIdentity, OpResultStorage] = - mutable.Map.empty - - def getOpResultStorage(workflowIdentity: WorkflowIdentity): OpResultStorage = { - workflowIdToOpResultMapping.getOrElseUpdate(workflowIdentity, new OpResultStorage()) - } -} diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala index 24ee6b6956c..bd004400b0d 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala @@ -17,8 +17,7 @@ import scala.jdk.CollectionConverters._ * IcebergDocument is used to read and write a set of T as an Iceberg table. * It provides iterator-based read methods and supports multiple writers to write to the same table. * - * - On construction, the table will be created if it does not exist. - * - If the table exists, it will be overridden. + * The table must exist when constructing the document * * @param tableNamespace namespace of the table. * @param tableName name of the table. @@ -27,7 +26,7 @@ import scala.jdk.CollectionConverters._ * @param deserde function to deserialize an Iceberg Record into T. * @tparam T type of the data items stored in the Iceberg table. */ -class IcebergDocument[T >: Null <: AnyRef]( +private[storage] class IcebergDocument[T >: Null <: AnyRef]( val tableNamespace: String, val tableName: String, val tableSchema: org.apache.iceberg.Schema, @@ -39,15 +38,6 @@ class IcebergDocument[T >: Null <: AnyRef]( @transient lazy val catalog: Catalog = IcebergCatalogInstance.getInstance() - // During construction, create or override the table - IcebergUtil.createTable( - catalog, - tableNamespace, - tableName, - tableSchema, - overrideIfExists = true - ) - /** * Returns the URI of the table location. * @throws NoSuchTableException if the table does not exist. diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergTableWriter.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergTableWriter.scala index 3bfcea98539..1f79b649c18 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergTableWriter.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergTableWriter.scala @@ -29,7 +29,7 @@ import scala.collection.mutable.ArrayBuffer * @param serde a function to serialize `T` into an Iceberg `Record`. * @tparam T the type of the data items written to the table. */ -class IcebergTableWriter[T]( +private[storage] class IcebergTableWriter[T]( val writerIdentifier: String, val catalog: Catalog, val tableNamespace: String, diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/core/storage/model/VirtualDocumentSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/core/storage/model/VirtualDocumentSpec.scala index 14d3d82ca36..4763d9bd6eb 100644 --- a/core/workflow-core/src/test/scala/edu/uci/ics/amber/core/storage/model/VirtualDocumentSpec.scala +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/core/storage/model/VirtualDocumentSpec.scala @@ -21,13 +21,6 @@ trait VirtualDocumentSpec[T] extends AnyFlatSpec with BeforeAndAfterEach { */ def getDocument: VirtualDocument[T] - /** - * Checks if the document has been cleared. - * Subclasses should override this to provide their specific check. - * @return true if the document is cleared, false otherwise. - */ - def isDocumentCleared: Boolean - // VirtualDocument instance for each test var document: VirtualDocument[T] = _ @@ -96,7 +89,6 @@ trait VirtualDocumentSpec[T] extends AnyFlatSpec with BeforeAndAfterEach { document.clear() // Check if the document is cleared - assert(isDocumentCleared, "The document should be cleared after calling clear.") assert(document.get().isEmpty, "The document should have no items after clearing.") } diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala index 250ffdb1ed9..16f45798255 100644 --- a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -1,9 +1,21 @@ package edu.uci.ics.amber.storage.result.iceberg -import edu.uci.ics.amber.core.storage.{IcebergCatalogInstance, StorageConfig} -import edu.uci.ics.amber.core.storage.model.VirtualDocumentSpec +import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT +import edu.uci.ics.amber.core.storage.{ + DocumentFactory, + IcebergCatalogInstance, + StorageConfig, + VFSURIFactory +} +import edu.uci.ics.amber.core.storage.model.{VirtualDocument, VirtualDocumentSpec} import edu.uci.ics.amber.core.storage.result.iceberg.IcebergDocument import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import edu.uci.ics.amber.core.virtualidentity.{ + ExecutionIdentity, + OperatorIdentity, + WorkflowIdentity +} +import edu.uci.ics.amber.core.workflow.PortIdentity import edu.uci.ics.amber.util.IcebergUtil import org.apache.iceberg.catalog.Catalog import org.apache.iceberg.data.Record @@ -11,6 +23,7 @@ import org.apache.iceberg.{Schema => IcebergSchema} import org.apache.iceberg.catalog.TableIdentifier import org.scalatest.BeforeAndAfterAll +import java.net.URI import java.sql.Timestamp import java.util.UUID @@ -22,7 +35,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter var deserde: (IcebergSchema, Record) => Tuple = _ var catalog: Catalog = _ val tableNamespace = "test_namespace" - var tableName: String = _ + var uri: URI = _ override def beforeAll(): Unit = { super.beforeAll() @@ -57,7 +70,13 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter override def beforeEach(): Unit = { // Generate a unique table name for each test - tableName = s"test_table_${UUID.randomUUID().toString.replace("-", "")}" + uri = VFSURIFactory.createResultURI( + WorkflowIdentity(0), + ExecutionIdentity(0), + OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"), + PortIdentity() + ) + DocumentFactory.createDocument(uri, amberSchema) super.beforeEach() } @@ -65,21 +84,8 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter super.afterAll() } - // Implementation of getDocument - override def getDocument: IcebergDocument[Tuple] = { - new IcebergDocument[Tuple]( - tableNamespace, - tableName, - icebergSchema, - serde, - deserde - ) - } - - // Implementation of isDocumentCleared - override def isDocumentCleared: Boolean = { - val identifier = TableIdentifier.of(tableNamespace, tableName) - !catalog.tableExists(identifier) + override def getDocument: VirtualDocument[Tuple] = { + DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]] } override def generateSampleItems(): List[Tuple] = { diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala index c7fd35a93bc..8e5db0fe47d 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala @@ -1,7 +1,8 @@ package edu.uci.ics.amber.operator import edu.uci.ics.amber.core.executor.{OpExecSink, OpExecSource} -import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} +import edu.uci.ics.amber.core.storage.VFSURIFactory +import edu.uci.ics.amber.core.storage.DocumentFactory import edu.uci.ics.amber.core.tuple.Schema import edu.uci.ics.amber.core.virtualidentity.{ ExecutionIdentity, @@ -17,20 +18,20 @@ import edu.uci.ics.amber.core.workflow.OutputPort.OutputMode.{ import edu.uci.ics.amber.core.workflow._ import edu.uci.ics.amber.operator.sink.ProgressiveUtils +import java.net.URI + object SpecialPhysicalOpFactory { def newSinkPhysicalOp( - workflowIdentity: WorkflowIdentity, - executionIdentity: ExecutionIdentity, - storageKey: String, + uri: URI, outputMode: OutputMode ): PhysicalOp = { - val (opId, portId) = OpResultStorage.decodeStorageKey(storageKey) + val (workflowIdentity, executionIdentity, opId, portId, _) = VFSURIFactory.decodeURI(uri) PhysicalOp .localPhysicalOp( - PhysicalOpIdentity(opId, s"sink${portId.id}"), + PhysicalOpIdentity(opId, s"sink${portId.get.id}"), workflowIdentity, executionIdentity, - OpExecSink(storageKey, workflowIdentity, outputMode) + OpExecSink(uri.toString, workflowIdentity, outputMode) ) .withInputPorts(List(InputPort(PortIdentity(internal = true)))) .withOutputPorts(List(OutputPort(PortIdentity(internal = true)))) @@ -66,23 +67,26 @@ object SpecialPhysicalOpFactory { def newSourcePhysicalOp( workflowIdentity: WorkflowIdentity, executionIdentity: ExecutionIdentity, - storageKey: String + uri: URI ): PhysicalOp = { - val (opId, portId) = OpResultStorage.decodeStorageKey(storageKey) - val opResultStorage = ResultStorage.getOpResultStorage(workflowIdentity) + val (_, _, opId, portId, _) = VFSURIFactory.decodeURI(uri) val outputPort = OutputPort() PhysicalOp .sourcePhysicalOp( - PhysicalOpIdentity(opId, s"source${portId.id}"), + PhysicalOpIdentity(opId, s"source${portId.get.id}"), workflowIdentity, executionIdentity, - OpExecSource(storageKey, workflowIdentity) + OpExecSource(uri.toString, workflowIdentity) ) .withInputPorts(List.empty) .withOutputPorts(List(outputPort)) .withPropagateSchema( - SchemaPropagationFunc(_ => Map(outputPort.id -> opResultStorage.getSchema(storageKey))) + SchemaPropagationFunc(_ => + Map(outputPort.id -> { + DocumentFactory.openDocument(uri)._2.get + }) + ) ) .propagateSchema() diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala index 99ac9796820..0ae9742ca12 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala @@ -1,24 +1,23 @@ package edu.uci.ics.amber.operator.sink import edu.uci.ics.amber.core.executor.SinkOperatorExecutor -import edu.uci.ics.amber.core.storage.model.BufferedItemWriter -import edu.uci.ics.amber.core.storage.result.ResultStorage +import edu.uci.ics.amber.core.storage.DocumentFactory +import edu.uci.ics.amber.core.storage.model.{BufferedItemWriter, VirtualDocument} +import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} -import edu.uci.ics.amber.core.virtualidentity.WorkflowIdentity import edu.uci.ics.amber.core.workflow.OutputPort.OutputMode import edu.uci.ics.amber.core.workflow.PortIdentity +import java.net.URI + class ProgressiveSinkOpExec( workerId: Int, outputMode: OutputMode, - storageKey: String, - workflowIdentity: WorkflowIdentity + storageURI: URI ) extends SinkOperatorExecutor { + val (doc, _) = DocumentFactory.openDocument(storageURI) val writer: BufferedItemWriter[Tuple] = - ResultStorage - .getOpResultStorage(workflowIdentity) - .get(storageKey) - .writer(workerId.toString) + doc.writer(workerId.toString).asInstanceOf[BufferedItemWriter[Tuple]] override def open(): Unit = { writer.open() diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala index f8dac9d938f..09497e3d7a7 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala @@ -2,14 +2,21 @@ package edu.uci.ics.amber.operator.source.cache import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.executor.SourceOperatorExecutor -import edu.uci.ics.amber.core.storage.result.ResultStorage -import edu.uci.ics.amber.core.tuple.TupleLike -import edu.uci.ics.amber.core.virtualidentity.WorkflowIdentity +import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} +import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT +import edu.uci.ics.amber.core.storage.model.VirtualDocument +import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} -class CacheSourceOpExec(storageKey: String, workflowIdentity: WorkflowIdentity) - extends SourceOperatorExecutor - with LazyLogging { - private val storage = ResultStorage.getOpResultStorage(workflowIdentity).get(storageKey) +import java.net.URI + +class CacheSourceOpExec(storageUri: URI) extends SourceOperatorExecutor with LazyLogging { + val (_, _, _, _, resourceType) = VFSURIFactory.decodeURI(storageUri) + if (resourceType != MATERIALIZED_RESULT) { + throw new RuntimeException("The storage URI must point to an materialized result storage") + } + + private val storage = + DocumentFactory.openDocument(storageUri)._1.asInstanceOf[VirtualDocument[Tuple]] override def produceTuple(): Iterator[TupleLike] = storage.get() diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/FileScanSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/FileScanSourceOpExec.scala index 1d786d31d3a..c79a8306a81 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/FileScanSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/FileScanSourceOpExec.scala @@ -23,7 +23,7 @@ class FileScanSourceOpExec private[scan] ( override def produceTuple(): Iterator[TupleLike] = { var filenameIt: Iterator[String] = Iterator.empty val fileEntries: Iterator[InputStream] = { - val is = DocumentFactory.newReadonlyDocument(new URI(desc.fileName.get)).asInputStream() + val is = DocumentFactory.openReadonlyDocument(new URI(desc.fileName.get)).asInputStream() if (desc.extract) { val inputStream: ArchiveInputStream = new ArchiveStreamFactory().createArchiveInputStream( new BufferedInputStream(is) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/arrow/ArrowSourceOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/arrow/ArrowSourceOpDesc.scala index 135ae2b6657..8f63a02af4c 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/arrow/ArrowSourceOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/arrow/ArrowSourceOpDesc.scala @@ -54,7 +54,7 @@ class ArrowSourceOpDesc extends ScanSourceOpDesc { */ @Override def inferSchema(): Schema = { - val file = DocumentFactory.newReadonlyDocument(new URI(fileName.get)).asFile() + val file = DocumentFactory.openReadonlyDocument(new URI(fileName.get)).asFile() val allocator = new RootAllocator() Using diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/arrow/ArrowSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/arrow/ArrowSourceOpExec.scala index 548d0734bb8..2ec291bfa7b 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/arrow/ArrowSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/arrow/ArrowSourceOpExec.scala @@ -23,7 +23,7 @@ class ArrowSourceOpExec( override def open(): Unit = { try { - val file = DocumentFactory.newReadonlyDocument(new URI(desc.fileName.get)).asFile() + val file = DocumentFactory.openReadonlyDocument(new URI(desc.fileName.get)).asFile() val alloc = new RootAllocator() allocator = Some(alloc) val channel = Files.newByteChannel(file.toPath, StandardOpenOption.READ) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala index f689611c5f8..d4e1c2ef156 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala @@ -61,7 +61,7 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc { if (customDelimiter.isEmpty || !fileResolved()) { return null } - val stream = DocumentFactory.newReadonlyDocument(new URI(fileName.get)).asInputStream() + val stream = DocumentFactory.openReadonlyDocument(new URI(fileName.get)).asInputStream() val inputReader = new InputStreamReader(stream, fileEncoding.getCharset) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala index a111219440c..df5953371ef 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala @@ -58,7 +58,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat override def open(): Unit = { inputReader = new InputStreamReader( - DocumentFactory.newReadonlyDocument(new URI(desc.fileName.get)).asInputStream(), + DocumentFactory.openReadonlyDocument(new URI(desc.fileName.get)).asInputStream(), desc.fileEncoding.getCharset ) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala index c5bffbf2080..3e362ec7325 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala @@ -63,7 +63,7 @@ class ParallelCSVScanSourceOpDesc extends ScanSourceOpDesc { if (customDelimiter.isEmpty || !fileResolved()) { return null } - val file = DocumentFactory.newReadonlyDocument(new URI(fileName.get)).asFile() + val file = DocumentFactory.openReadonlyDocument(new URI(fileName.get)).asFile() implicit object CustomFormat extends DefaultCSVFormat { override val delimiter: Char = customDelimiter.get.charAt(0) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala index 9bdde254c79..108050c3978 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala @@ -71,7 +71,7 @@ class ParallelCSVScanSourceOpExec private[csv] ( override def open(): Unit = { // here, the stream requires to be seekable, so datasetFileDesc creates a temp file here // TODO: consider a better way - val file = DocumentFactory.newReadonlyDocument(new URI(desc.fileName.get)).asFile() + val file = DocumentFactory.openReadonlyDocument(new URI(desc.fileName.get)).asFile() val totalBytes: Long = file.length() // TODO: add support for limit // TODO: add support for offset diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala index f4a3c427fc7..97be3d95b24 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala @@ -60,7 +60,7 @@ class CSVOldScanSourceOpDesc extends ScanSourceOpDesc { return null } // infer schema from the first few lines of the file - val file = DocumentFactory.newReadonlyDocument(new URI(fileName.get)).asFile() + val file = DocumentFactory.openReadonlyDocument(new URI(fileName.get)).asFile() implicit object CustomFormat extends DefaultCSVFormat { override val delimiter: Char = customDelimiter.get.charAt(0) } diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csvOld/CSVOldScanSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csvOld/CSVOldScanSourceOpExec.scala index 28241ea3cf5..74753470961 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csvOld/CSVOldScanSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csvOld/CSVOldScanSourceOpExec.scala @@ -46,7 +46,7 @@ class CSVOldScanSourceOpExec private[csvOld] ( implicit object CustomFormat extends DefaultCSVFormat { override val delimiter: Char = desc.customDelimiter.get.charAt(0) } - val filePath = DocumentFactory.newReadonlyDocument(new URI(desc.fileName.get)).asFile().toPath + val filePath = DocumentFactory.openReadonlyDocument(new URI(desc.fileName.get)).asFile().toPath reader = CSVReader.open(filePath.toString, desc.fileEncoding.getCharset.name())(CustomFormat) // skip line if this worker reads the start of a file, and the file has a header line val startOffset = desc.offset.getOrElse(0) + (if (desc.hasHeader) 1 else 0) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpDesc.scala index f0d7eb0c789..30d0e58d045 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpDesc.scala @@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} import com.fasterxml.jackson.databind.JsonNode import edu.uci.ics.amber.core.executor.OpExecWithClassName import edu.uci.ics.amber.core.storage.DocumentFactory -import edu.uci.ics.amber.core.storage.model.DatasetFileDocument import edu.uci.ics.amber.core.tuple.AttributeTypeUtils.inferSchemaFromRows import edu.uci.ics.amber.core.tuple.{Attribute, Schema} import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} @@ -25,14 +24,6 @@ class JSONLScanSourceOpDesc extends ScanSourceOpDesc { fileTypeName = Option("JSONL") - def createInputStream(filepath: String, fileDesc: DatasetFileDocument): InputStream = { - if (filepath != null) { - new FileInputStream(filepath) - } else { - fileDesc.asInputStream() - } - } - @throws[IOException] override def getPhysicalOp( workflowId: WorkflowIdentity, @@ -61,7 +52,7 @@ class JSONLScanSourceOpDesc extends ScanSourceOpDesc { if (!fileResolved()) { return null } - val stream = DocumentFactory.newReadonlyDocument(new URI(fileName.get)).asInputStream() + val stream = DocumentFactory.openReadonlyDocument(new URI(fileName.get)).asInputStream() val reader = new BufferedReader(new InputStreamReader(stream, fileEncoding.getCharset)) var fieldNames = Set[String]() diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala index ec4490d6964..9c4e9b01e02 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala @@ -39,7 +39,7 @@ class JSONLScanSourceOpExec private[json] ( } override def open(): Unit = { - val stream = DocumentFactory.newReadonlyDocument(new URI(desc.fileName.get)).asInputStream() + val stream = DocumentFactory.openReadonlyDocument(new URI(desc.fileName.get)).asInputStream() // count lines and partition the task to each worker reader = new BufferedReader( new InputStreamReader(stream, desc.fileEncoding.getCharset)