diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index 00428d66fc..f7729e4a81 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -1,7 +1,6 @@ package edu.uci.ics.amber.engine.architecture.scheduling import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} -import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetAccessResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetAccessResource.scala index 7f68b233f9..1dd0cd334e 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetAccessResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetAccessResource.scala @@ -6,7 +6,6 @@ import edu.uci.ics.texera.dao.SqlServer import edu.uci.ics.texera.web.model.common.AccessEntry import edu.uci.ics.texera.dao.jooq.generated.Tables.USER import edu.uci.ics.texera.dao.jooq.generated.enums.DatasetUserAccessPrivilege -import edu.uci.ics.texera.dao.jooq.generated.tables.Dataset.DATASET import edu.uci.ics.texera.dao.jooq.generated.tables.DatasetUserAccess.DATASET_USER_ACCESS import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{DatasetDao, DatasetUserAccessDao, UserDao} import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{DatasetUserAccess, User} @@ -14,8 +13,7 @@ import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResou context, getOwner } -import org.jooq.{Condition, DSLContext} -import org.jooq.impl.DSL +import org.jooq.DSLContext import org.jooq.types.UInteger import java.util diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala index fd3bfaf796..35e3883b0d 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala @@ -25,7 +25,7 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ DatasetVersion, User } -import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource.{context, _} +import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource._ import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource.{context, _} import edu.uci.ics.texera.web.resource.dashboard.user.dataset.`type`.DatasetFileNode import io.dropwizard.auth.Auth diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 317930056c..ab6cec64c5 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -4,7 +4,6 @@ import edu.uci.ics.amber.core.storage.StorageConfig import edu.uci.ics.amber.engine.architecture.logreplay.{ReplayDestination, ReplayLogRecord} import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage import edu.uci.ics.amber.core.virtualidentity.{ChannelMarkerIdentity, ExecutionIdentity} -import edu.uci.ics.amber.engine.common.AmberConfig import edu.uci.ics.texera.dao.SqlServer import edu.uci.ics.texera.web.auth.SessionUser import edu.uci.ics.texera.dao.jooq.generated.Tables.{ diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala index 09163a3377..664a9dbc79 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala @@ -5,7 +5,7 @@ import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName} import com.fasterxml.jackson.databind.node.ObjectNode import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.storage.DocumentFactory.MONGODB -import edu.uci.ics.amber.core.storage.VFSResourceType.{MATERIALIZED_RESULT, RESULT} +import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT import edu.uci.ics.amber.core.storage.model.VirtualDocument import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory} import edu.uci.ics.amber.core.storage.result._ @@ -35,10 +35,8 @@ import edu.uci.ics.texera.web.model.websocket.event.{ WebResultUpdateEvent } import edu.uci.ics.texera.web.model.websocket.request.ResultPaginationRequest -import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource import edu.uci.ics.texera.web.service.WorkflowExecutionService.getLatestExecutionId import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore} -import org.jooq.types.UInteger import java.util.UUID import scala.collection.mutable diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala index 078efbb2c9..8d40867e90 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala @@ -8,7 +8,6 @@ import com.google.api.services.drive.model.{File, FileList, Permission} import com.google.api.services.sheets.v4.Sheets import com.google.api.services.sheets.v4.model.{Spreadsheet, SpreadsheetProperties, ValueRange} import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} -import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT import edu.uci.ics.amber.core.storage.model.VirtualDocument import edu.uci.ics.amber.core.tuple.Tuple import edu.uci.ics.amber.engine.common.Utils.retry diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala index 6aa32d84d9..b72b52ba62 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala @@ -22,13 +22,11 @@ import edu.uci.ics.amber.core.virtualidentity.{ ExecutionIdentity, WorkflowIdentity } -import edu.uci.ics.amber.core.workflow.WorkflowContext.DEFAULT_EXECUTION_ID import edu.uci.ics.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE import edu.uci.ics.amber.core.workflowruntimestate.WorkflowFatalError import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User import edu.uci.ics.texera.web.model.websocket.event.TexeraWebSocketEvent import edu.uci.ics.texera.web.model.websocket.request.WorkflowExecuteRequest -import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource import edu.uci.ics.texera.web.service.WorkflowService.mkWorkflowStateId import edu.uci.ics.texera.web.storage.ExecutionStateStore.updateWorkflowState import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore} @@ -36,7 +34,6 @@ import edu.uci.ics.texera.web.{SubscriptionManager, WorkflowLifecycleManager} import edu.uci.ics.texera.workflow.LogicalPlan import io.reactivex.rxjava3.disposables.{CompositeDisposable, Disposable} import io.reactivex.rxjava3.subjects.BehaviorSubject -import org.jooq.types.UInteger import play.api.libs.json.Json import java.net.URI @@ -101,6 +98,7 @@ class WorkflowService( case _: Throwable => // exception can be raised if the document is already cleared } ) + ExecutionResourcesMapping.removeExecutionResources(eid) }) WorkflowService.workflowServiceMapping.remove(mkWorkflowStateId(workflowId)) if (executionService.getValue != null) { @@ -185,6 +183,7 @@ class WorkflowService( case _: Throwable => } ) + ExecutionResourcesMapping.removeExecutionResources(eid) }) // TODO: change this behavior after enabling cache. workflowContext.executionId = ExecutionsMetadataPersistService.insertNewExecution( diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index f811dee6ae..9aae707362 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -1,7 +1,6 @@ package edu.uci.ics.texera.workflow import com.typesafe.scalalogging.LazyLogging -import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory} import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 81decb71c5..601915ecbb 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -8,7 +8,6 @@ import ch.vorburger.mariadb4j.DB import com.twitter.util.{Await, Duration, Promise} import edu.uci.ics.amber.clustering.SingleNodeListener import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} -import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT import edu.uci.ics.amber.core.storage.model.VirtualDocument import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple} diff --git a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/common/storage/ReadonlyLocalFileDocumentSpec.scala b/core/amber/src/test/scala/edu/uci/ics/texera/workflow/common/storage/ReadonlyLocalFileDocumentSpec.scala index 4927695868..0f767a8d93 100644 --- a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/common/storage/ReadonlyLocalFileDocumentSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/texera/workflow/common/storage/ReadonlyLocalFileDocumentSpec.scala @@ -1,7 +1,7 @@ package edu.uci.ics.amber.engine.common.storage import edu.uci.ics.amber.core.storage.DocumentFactory -import edu.uci.ics.amber.core.storage.model.{ReadonlyLocalFileDocument, ReadonlyVirtualDocument} +import edu.uci.ics.amber.core.storage.model.ReadonlyVirtualDocument import org.scalatest.BeforeAndAfter import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala index fcb391cba7..bb148c6ab0 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala @@ -1,12 +1,5 @@ package edu.uci.ics.amber.core.storage -import edu.uci.ics.amber.core.storage.FileResolver.DATASET_FILE_URI_SCHEME -import edu.uci.ics.amber.core.virtualidentity.{ - ExecutionIdentity, - OperatorIdentity, - WorkflowIdentity -} -import edu.uci.ics.amber.core.workflow.PortIdentity import edu.uci.ics.texera.dao.SqlServer import edu.uci.ics.texera.dao.SqlServer.withTransaction import edu.uci.ics.texera.dao.jooq.generated.tables.Dataset.DATASET diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala index 511c39e41e..c315aca0bc 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala @@ -1,6 +1,6 @@ package edu.uci.ics.amber.core.storage.result -import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity import java.net.URI import scala.collection.mutable @@ -43,4 +43,14 @@ object ExecutionResourcesMapping { case None => Some(List(uri)) // Create a new list if key doesn't exist } } + + /** + * Remove all resources associated with a given execution ID. + * + * @param executionIdentity the target execution ID + * @return true if the entry was removed, false if it did not exist + */ + def removeExecutionResources(executionIdentity: ExecutionIdentity): Boolean = { + executionIdToExecutionResourcesMapping.remove(executionIdentity).isDefined + } } diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala index 16f4579825..5b482a1e09 100644 --- a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -1,6 +1,5 @@ package edu.uci.ics.amber.storage.result.iceberg -import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT import edu.uci.ics.amber.core.storage.{ DocumentFactory, IcebergCatalogInstance, @@ -8,7 +7,6 @@ import edu.uci.ics.amber.core.storage.{ VFSURIFactory } import edu.uci.ics.amber.core.storage.model.{VirtualDocument, VirtualDocumentSpec} -import edu.uci.ics.amber.core.storage.result.iceberg.IcebergDocument import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} import edu.uci.ics.amber.core.virtualidentity.{ ExecutionIdentity, @@ -20,7 +18,6 @@ import edu.uci.ics.amber.util.IcebergUtil import org.apache.iceberg.catalog.Catalog import org.apache.iceberg.data.Record import org.apache.iceberg.{Schema => IcebergSchema} -import org.apache.iceberg.catalog.TableIdentifier import org.scalatest.BeforeAndAfterAll import java.net.URI diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/dictionary/DictionaryMatcherOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/dictionary/DictionaryMatcherOpDesc.scala index 3b5a60f222..2e6d29f429 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/dictionary/DictionaryMatcherOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/dictionary/DictionaryMatcherOpDesc.scala @@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} import edu.uci.ics.amber.core.executor.OpExecWithClassName import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType} import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} -import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} import edu.uci.ics.amber.core.workflow.{InputPort, OutputPort, PhysicalOp, SchemaPropagationFunc} import edu.uci.ics.amber.operator.map.MapOpDesc import edu.uci.ics.amber.operator.metadata.annotations.AutofillAttributeName diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/projection/ProjectionOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/projection/ProjectionOpDesc.scala index 120b599691..80bc4d4e64 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/projection/ProjectionOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/projection/ProjectionOpDesc.scala @@ -3,7 +3,7 @@ package edu.uci.ics.amber.operator.projection import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import edu.uci.ics.amber.core.executor.OpExecWithClassName -import edu.uci.ics.amber.core.tuple.{Attribute, Schema} +import edu.uci.ics.amber.core.tuple.Schema import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} import edu.uci.ics.amber.core.workflow.PhysicalOp.oneToOnePhysicalOp import edu.uci.ics.amber.core.workflow._ diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala index 0ae9742ca1..9aacf8ddaf 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala @@ -2,8 +2,7 @@ package edu.uci.ics.amber.operator.sink import edu.uci.ics.amber.core.executor.SinkOperatorExecutor import edu.uci.ics.amber.core.storage.DocumentFactory -import edu.uci.ics.amber.core.storage.model.{BufferedItemWriter, VirtualDocument} -import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping +import edu.uci.ics.amber.core.storage.model.BufferedItemWriter import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} import edu.uci.ics.amber.core.workflow.OutputPort.OutputMode import edu.uci.ics.amber.core.workflow.PortIdentity