Skip to content

Commit

Permalink
Remove the corresponding resource entry from the global ExecutionReso…
Browse files Browse the repository at this point in the history
…urceMapping during the garbage collection (#3216)

This PR completes the garbage collection logic under the
newly-introduced URIs and ExecutionResourceMapping.

During the garbage collection, after all the documents associated with
the last execution id are released, it then removes the entry from the
mapping.

This PR also did the scalafixAll to remove unused imports.
  • Loading branch information
bobbai00 authored Jan 15, 2025
1 parent 3ffa9bf commit 64007f7
Show file tree
Hide file tree
Showing 16 changed files with 19 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package edu.uci.ics.amber.engine.architecture.scheduling

import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT
import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@ import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.web.model.common.AccessEntry
import edu.uci.ics.texera.dao.jooq.generated.Tables.USER
import edu.uci.ics.texera.dao.jooq.generated.enums.DatasetUserAccessPrivilege
import edu.uci.ics.texera.dao.jooq.generated.tables.Dataset.DATASET
import edu.uci.ics.texera.dao.jooq.generated.tables.DatasetUserAccess.DATASET_USER_ACCESS
import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{DatasetDao, DatasetUserAccessDao, UserDao}
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{DatasetUserAccess, User}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource.{
context,
getOwner
}
import org.jooq.{Condition, DSLContext}
import org.jooq.impl.DSL
import org.jooq.DSLContext
import org.jooq.types.UInteger

import java.util
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{
DatasetVersion,
User
}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource.{context, _}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource._
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource.{context, _}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.`type`.DatasetFileNode
import io.dropwizard.auth.Auth
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import edu.uci.ics.amber.core.storage.StorageConfig
import edu.uci.ics.amber.engine.architecture.logreplay.{ReplayDestination, ReplayLogRecord}
import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage
import edu.uci.ics.amber.core.virtualidentity.{ChannelMarkerIdentity, ExecutionIdentity}
import edu.uci.ics.amber.engine.common.AmberConfig
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.web.auth.SessionUser
import edu.uci.ics.texera.dao.jooq.generated.Tables.{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName}
import com.fasterxml.jackson.databind.node.ObjectNode
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.DocumentFactory.MONGODB
import edu.uci.ics.amber.core.storage.VFSResourceType.{MATERIALIZED_RESULT, RESULT}
import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory}
import edu.uci.ics.amber.core.storage.result._
Expand Down Expand Up @@ -35,10 +35,8 @@ import edu.uci.ics.texera.web.model.websocket.event.{
WebResultUpdateEvent
}
import edu.uci.ics.texera.web.model.websocket.request.ResultPaginationRequest
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
import edu.uci.ics.texera.web.service.WorkflowExecutionService.getLatestExecutionId
import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore}
import org.jooq.types.UInteger

import java.util.UUID
import scala.collection.mutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import com.google.api.services.drive.model.{File, FileList, Permission}
import com.google.api.services.sheets.v4.Sheets
import com.google.api.services.sheets.v4.model.{Spreadsheet, SpreadsheetProperties, ValueRange}
import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.engine.common.Utils.retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,18 @@ import edu.uci.ics.amber.core.virtualidentity.{
ExecutionIdentity,
WorkflowIdentity
}
import edu.uci.ics.amber.core.workflow.WorkflowContext.DEFAULT_EXECUTION_ID
import edu.uci.ics.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE
import edu.uci.ics.amber.core.workflowruntimestate.WorkflowFatalError
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User
import edu.uci.ics.texera.web.model.websocket.event.TexeraWebSocketEvent
import edu.uci.ics.texera.web.model.websocket.request.WorkflowExecuteRequest
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
import edu.uci.ics.texera.web.service.WorkflowService.mkWorkflowStateId
import edu.uci.ics.texera.web.storage.ExecutionStateStore.updateWorkflowState
import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore}
import edu.uci.ics.texera.web.{SubscriptionManager, WorkflowLifecycleManager}
import edu.uci.ics.texera.workflow.LogicalPlan
import io.reactivex.rxjava3.disposables.{CompositeDisposable, Disposable}
import io.reactivex.rxjava3.subjects.BehaviorSubject
import org.jooq.types.UInteger
import play.api.libs.json.Json

import java.net.URI
Expand Down Expand Up @@ -101,6 +98,7 @@ class WorkflowService(
case _: Throwable => // exception can be raised if the document is already cleared
}
)
ExecutionResourcesMapping.removeExecutionResources(eid)
})
WorkflowService.workflowServiceMapping.remove(mkWorkflowStateId(workflowId))
if (executionService.getValue != null) {
Expand Down Expand Up @@ -185,6 +183,7 @@ class WorkflowService(
case _: Throwable =>
}
)
ExecutionResourcesMapping.removeExecutionResources(eid)
}) // TODO: change this behavior after enabling cache.

workflowContext.executionId = ExecutionsMetadataPersistService.insertNewExecution(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package edu.uci.ics.texera.workflow

import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT
import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory}
import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import ch.vorburger.mariadb4j.DB
import com.twitter.util.{Await, Duration, Promise}
import edu.uci.ics.amber.clustering.SingleNodeListener
import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package edu.uci.ics.amber.engine.common.storage

import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.storage.model.{ReadonlyLocalFileDocument, ReadonlyVirtualDocument}
import edu.uci.ics.amber.core.storage.model.ReadonlyVirtualDocument
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
package edu.uci.ics.amber.core.storage

import edu.uci.ics.amber.core.storage.FileResolver.DATASET_FILE_URI_SCHEME
import edu.uci.ics.amber.core.virtualidentity.{
ExecutionIdentity,
OperatorIdentity,
WorkflowIdentity
}
import edu.uci.ics.amber.core.workflow.PortIdentity
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.dao.SqlServer.withTransaction
import edu.uci.ics.texera.dao.jooq.generated.tables.Dataset.DATASET
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package edu.uci.ics.amber.core.storage.result

import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity

import java.net.URI
import scala.collection.mutable
Expand Down Expand Up @@ -43,4 +43,14 @@ object ExecutionResourcesMapping {
case None => Some(List(uri)) // Create a new list if key doesn't exist
}
}

/**
* Remove all resources associated with a given execution ID.
*
* @param executionIdentity the target execution ID
* @return true if the entry was removed, false if it did not exist
*/
def removeExecutionResources(executionIdentity: ExecutionIdentity): Boolean = {
executionIdToExecutionResourcesMapping.remove(executionIdentity).isDefined
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package edu.uci.ics.amber.storage.result.iceberg

import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT
import edu.uci.ics.amber.core.storage.{
DocumentFactory,
IcebergCatalogInstance,
StorageConfig,
VFSURIFactory
}
import edu.uci.ics.amber.core.storage.model.{VirtualDocument, VirtualDocumentSpec}
import edu.uci.ics.amber.core.storage.result.iceberg.IcebergDocument
import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple}
import edu.uci.ics.amber.core.virtualidentity.{
ExecutionIdentity,
Expand All @@ -20,7 +18,6 @@ import edu.uci.ics.amber.util.IcebergUtil
import org.apache.iceberg.catalog.Catalog
import org.apache.iceberg.data.Record
import org.apache.iceberg.{Schema => IcebergSchema}
import org.apache.iceberg.catalog.TableIdentifier
import org.scalatest.BeforeAndAfterAll

import java.net.URI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
import edu.uci.ics.amber.core.executor.OpExecWithClassName
import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType}
import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import edu.uci.ics.amber.core.workflow.{InputPort, OutputPort, PhysicalOp, SchemaPropagationFunc}
import edu.uci.ics.amber.operator.map.MapOpDesc
import edu.uci.ics.amber.operator.metadata.annotations.AutofillAttributeName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package edu.uci.ics.amber.operator.projection
import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import edu.uci.ics.amber.core.executor.OpExecWithClassName
import edu.uci.ics.amber.core.tuple.{Attribute, Schema}
import edu.uci.ics.amber.core.tuple.Schema
import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import edu.uci.ics.amber.core.workflow.PhysicalOp.oneToOnePhysicalOp
import edu.uci.ics.amber.core.workflow._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package edu.uci.ics.amber.operator.sink

import edu.uci.ics.amber.core.executor.SinkOperatorExecutor
import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.storage.model.{BufferedItemWriter, VirtualDocument}
import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
import edu.uci.ics.amber.core.storage.model.BufferedItemWriter
import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike}
import edu.uci.ics.amber.core.workflow.OutputPort.OutputMode
import edu.uci.ics.amber.core.workflow.PortIdentity
Expand Down

0 comments on commit 64007f7

Please sign in to comment.