From 64007f799c51f1084c1ac0d0b42b3fbdba12a5dc Mon Sep 17 00:00:00 2001 From: Jiadong Bai <43344272+bobbai00@users.noreply.github.com> Date: Wed, 15 Jan 2025 15:26:03 -0800 Subject: [PATCH] Remove the corresponding resource entry from the global ExecutionResourceMapping during the garbage collection (#3216) This PR completes the garbage collection logic under the newly-introduced URIs and ExecutionResourceMapping. During the garbage collection, after all the documents associated with the last execution id are released, it then removes the entry from the mapping. This PR also did the scalafixAll to remove unused imports. --- .../architecture/scheduling/ScheduleGenerator.scala | 1 - .../user/dataset/DatasetAccessResource.scala | 4 +--- .../dashboard/user/dataset/DatasetResource.scala | 2 +- .../user/workflow/WorkflowExecutionsResource.scala | 1 - .../texera/web/service/ExecutionResultService.scala | 4 +--- .../ics/texera/web/service/ResultExportService.scala | 1 - .../uci/ics/texera/web/service/WorkflowService.scala | 5 ++--- .../uci/ics/texera/workflow/WorkflowCompiler.scala | 1 - .../ics/amber/engine/e2e/DataProcessingSpec.scala | 1 - .../storage/ReadonlyLocalFileDocumentSpec.scala | 2 +- .../uci/ics/amber/core/storage/FileResolver.scala | 7 ------- .../storage/result/ExecutionResourcesMapping.scala | 12 +++++++++++- .../storage/result/iceberg/IcebergDocumentSpec.scala | 3 --- .../dictionary/DictionaryMatcherOpDesc.scala | 1 - .../amber/operator/projection/ProjectionOpDesc.scala | 2 +- .../amber/operator/sink/ProgressiveSinkOpExec.scala | 3 +-- 16 files changed, 19 insertions(+), 31 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index 00428d66fc..f7729e4a81 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -1,7 +1,6 @@ package edu.uci.ics.amber.engine.architecture.scheduling import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} -import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetAccessResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetAccessResource.scala index 7f68b233f9..1dd0cd334e 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetAccessResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetAccessResource.scala @@ -6,7 +6,6 @@ import edu.uci.ics.texera.dao.SqlServer import edu.uci.ics.texera.web.model.common.AccessEntry import edu.uci.ics.texera.dao.jooq.generated.Tables.USER import edu.uci.ics.texera.dao.jooq.generated.enums.DatasetUserAccessPrivilege -import edu.uci.ics.texera.dao.jooq.generated.tables.Dataset.DATASET import edu.uci.ics.texera.dao.jooq.generated.tables.DatasetUserAccess.DATASET_USER_ACCESS import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{DatasetDao, DatasetUserAccessDao, UserDao} import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{DatasetUserAccess, User} @@ -14,8 +13,7 @@ import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResou context, getOwner } -import org.jooq.{Condition, DSLContext} -import org.jooq.impl.DSL +import org.jooq.DSLContext import org.jooq.types.UInteger import java.util diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala index fd3bfaf796..35e3883b0d 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala @@ -25,7 +25,7 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ DatasetVersion, User } -import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource.{context, _} +import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource._ import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource.{context, _} import edu.uci.ics.texera.web.resource.dashboard.user.dataset.`type`.DatasetFileNode import io.dropwizard.auth.Auth diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 317930056c..ab6cec64c5 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -4,7 +4,6 @@ import edu.uci.ics.amber.core.storage.StorageConfig import edu.uci.ics.amber.engine.architecture.logreplay.{ReplayDestination, ReplayLogRecord} import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage import edu.uci.ics.amber.core.virtualidentity.{ChannelMarkerIdentity, ExecutionIdentity} -import edu.uci.ics.amber.engine.common.AmberConfig import edu.uci.ics.texera.dao.SqlServer import edu.uci.ics.texera.web.auth.SessionUser import edu.uci.ics.texera.dao.jooq.generated.Tables.{ diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala index 09163a3377..664a9dbc79 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala @@ -5,7 +5,7 @@ import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName} import com.fasterxml.jackson.databind.node.ObjectNode import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.storage.DocumentFactory.MONGODB -import edu.uci.ics.amber.core.storage.VFSResourceType.{MATERIALIZED_RESULT, RESULT} +import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT import edu.uci.ics.amber.core.storage.model.VirtualDocument import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory} import edu.uci.ics.amber.core.storage.result._ @@ -35,10 +35,8 @@ import edu.uci.ics.texera.web.model.websocket.event.{ WebResultUpdateEvent } import edu.uci.ics.texera.web.model.websocket.request.ResultPaginationRequest -import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource import edu.uci.ics.texera.web.service.WorkflowExecutionService.getLatestExecutionId import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore} -import org.jooq.types.UInteger import java.util.UUID import scala.collection.mutable diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala index 078efbb2c9..8d40867e90 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala @@ -8,7 +8,6 @@ import com.google.api.services.drive.model.{File, FileList, Permission} import com.google.api.services.sheets.v4.Sheets import com.google.api.services.sheets.v4.model.{Spreadsheet, SpreadsheetProperties, ValueRange} import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} -import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT import edu.uci.ics.amber.core.storage.model.VirtualDocument import edu.uci.ics.amber.core.tuple.Tuple import edu.uci.ics.amber.engine.common.Utils.retry diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala index 6aa32d84d9..b72b52ba62 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala @@ -22,13 +22,11 @@ import edu.uci.ics.amber.core.virtualidentity.{ ExecutionIdentity, WorkflowIdentity } -import edu.uci.ics.amber.core.workflow.WorkflowContext.DEFAULT_EXECUTION_ID import edu.uci.ics.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE import edu.uci.ics.amber.core.workflowruntimestate.WorkflowFatalError import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User import edu.uci.ics.texera.web.model.websocket.event.TexeraWebSocketEvent import edu.uci.ics.texera.web.model.websocket.request.WorkflowExecuteRequest -import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource import edu.uci.ics.texera.web.service.WorkflowService.mkWorkflowStateId import edu.uci.ics.texera.web.storage.ExecutionStateStore.updateWorkflowState import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore} @@ -36,7 +34,6 @@ import edu.uci.ics.texera.web.{SubscriptionManager, WorkflowLifecycleManager} import edu.uci.ics.texera.workflow.LogicalPlan import io.reactivex.rxjava3.disposables.{CompositeDisposable, Disposable} import io.reactivex.rxjava3.subjects.BehaviorSubject -import org.jooq.types.UInteger import play.api.libs.json.Json import java.net.URI @@ -101,6 +98,7 @@ class WorkflowService( case _: Throwable => // exception can be raised if the document is already cleared } ) + ExecutionResourcesMapping.removeExecutionResources(eid) }) WorkflowService.workflowServiceMapping.remove(mkWorkflowStateId(workflowId)) if (executionService.getValue != null) { @@ -185,6 +183,7 @@ class WorkflowService( case _: Throwable => } ) + ExecutionResourcesMapping.removeExecutionResources(eid) }) // TODO: change this behavior after enabling cache. workflowContext.executionId = ExecutionsMetadataPersistService.insertNewExecution( diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index f811dee6ae..9aae707362 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -1,7 +1,6 @@ package edu.uci.ics.texera.workflow import com.typesafe.scalalogging.LazyLogging -import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory} import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 81decb71c5..601915ecbb 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -8,7 +8,6 @@ import ch.vorburger.mariadb4j.DB import com.twitter.util.{Await, Duration, Promise} import edu.uci.ics.amber.clustering.SingleNodeListener import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} -import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT import edu.uci.ics.amber.core.storage.model.VirtualDocument import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple} diff --git a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/common/storage/ReadonlyLocalFileDocumentSpec.scala b/core/amber/src/test/scala/edu/uci/ics/texera/workflow/common/storage/ReadonlyLocalFileDocumentSpec.scala index 4927695868..0f767a8d93 100644 --- a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/common/storage/ReadonlyLocalFileDocumentSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/texera/workflow/common/storage/ReadonlyLocalFileDocumentSpec.scala @@ -1,7 +1,7 @@ package edu.uci.ics.amber.engine.common.storage import edu.uci.ics.amber.core.storage.DocumentFactory -import edu.uci.ics.amber.core.storage.model.{ReadonlyLocalFileDocument, ReadonlyVirtualDocument} +import edu.uci.ics.amber.core.storage.model.ReadonlyVirtualDocument import org.scalatest.BeforeAndAfter import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala index fcb391cba7..bb148c6ab0 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala @@ -1,12 +1,5 @@ package edu.uci.ics.amber.core.storage -import edu.uci.ics.amber.core.storage.FileResolver.DATASET_FILE_URI_SCHEME -import edu.uci.ics.amber.core.virtualidentity.{ - ExecutionIdentity, - OperatorIdentity, - WorkflowIdentity -} -import edu.uci.ics.amber.core.workflow.PortIdentity import edu.uci.ics.texera.dao.SqlServer import edu.uci.ics.texera.dao.SqlServer.withTransaction import edu.uci.ics.texera.dao.jooq.generated.tables.Dataset.DATASET diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala index 511c39e41e..c315aca0bc 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala @@ -1,6 +1,6 @@ package edu.uci.ics.amber.core.storage.result -import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity import java.net.URI import scala.collection.mutable @@ -43,4 +43,14 @@ object ExecutionResourcesMapping { case None => Some(List(uri)) // Create a new list if key doesn't exist } } + + /** + * Remove all resources associated with a given execution ID. + * + * @param executionIdentity the target execution ID + * @return true if the entry was removed, false if it did not exist + */ + def removeExecutionResources(executionIdentity: ExecutionIdentity): Boolean = { + executionIdToExecutionResourcesMapping.remove(executionIdentity).isDefined + } } diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala index 16f4579825..5b482a1e09 100644 --- a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -1,6 +1,5 @@ package edu.uci.ics.amber.storage.result.iceberg -import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT import edu.uci.ics.amber.core.storage.{ DocumentFactory, IcebergCatalogInstance, @@ -8,7 +7,6 @@ import edu.uci.ics.amber.core.storage.{ VFSURIFactory } import edu.uci.ics.amber.core.storage.model.{VirtualDocument, VirtualDocumentSpec} -import edu.uci.ics.amber.core.storage.result.iceberg.IcebergDocument import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} import edu.uci.ics.amber.core.virtualidentity.{ ExecutionIdentity, @@ -20,7 +18,6 @@ import edu.uci.ics.amber.util.IcebergUtil import org.apache.iceberg.catalog.Catalog import org.apache.iceberg.data.Record import org.apache.iceberg.{Schema => IcebergSchema} -import org.apache.iceberg.catalog.TableIdentifier import org.scalatest.BeforeAndAfterAll import java.net.URI diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/dictionary/DictionaryMatcherOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/dictionary/DictionaryMatcherOpDesc.scala index 3b5a60f222..2e6d29f429 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/dictionary/DictionaryMatcherOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/dictionary/DictionaryMatcherOpDesc.scala @@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} import edu.uci.ics.amber.core.executor.OpExecWithClassName import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType} import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} -import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} import edu.uci.ics.amber.core.workflow.{InputPort, OutputPort, PhysicalOp, SchemaPropagationFunc} import edu.uci.ics.amber.operator.map.MapOpDesc import edu.uci.ics.amber.operator.metadata.annotations.AutofillAttributeName diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/projection/ProjectionOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/projection/ProjectionOpDesc.scala index 120b599691..80bc4d4e64 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/projection/ProjectionOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/projection/ProjectionOpDesc.scala @@ -3,7 +3,7 @@ package edu.uci.ics.amber.operator.projection import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import edu.uci.ics.amber.core.executor.OpExecWithClassName -import edu.uci.ics.amber.core.tuple.{Attribute, Schema} +import edu.uci.ics.amber.core.tuple.Schema import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} import edu.uci.ics.amber.core.workflow.PhysicalOp.oneToOnePhysicalOp import edu.uci.ics.amber.core.workflow._ diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala index 0ae9742ca1..9aacf8ddaf 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala @@ -2,8 +2,7 @@ package edu.uci.ics.amber.operator.sink import edu.uci.ics.amber.core.executor.SinkOperatorExecutor import edu.uci.ics.amber.core.storage.DocumentFactory -import edu.uci.ics.amber.core.storage.model.{BufferedItemWriter, VirtualDocument} -import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping +import edu.uci.ics.amber.core.storage.model.BufferedItemWriter import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} import edu.uci.ics.amber.core.workflow.OutputPort.OutputMode import edu.uci.ics.amber.core.workflow.PortIdentity