Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the corresponding resource entry from the global ExecutionResourceMapping during the garbage collection #3216

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package edu.uci.ics.amber.engine.architecture.scheduling

import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT
import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@ import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.web.model.common.AccessEntry
import edu.uci.ics.texera.dao.jooq.generated.Tables.USER
import edu.uci.ics.texera.dao.jooq.generated.enums.DatasetUserAccessPrivilege
import edu.uci.ics.texera.dao.jooq.generated.tables.Dataset.DATASET
import edu.uci.ics.texera.dao.jooq.generated.tables.DatasetUserAccess.DATASET_USER_ACCESS
import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{DatasetDao, DatasetUserAccessDao, UserDao}
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{DatasetUserAccess, User}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource.{
context,
getOwner
}
import org.jooq.{Condition, DSLContext}
import org.jooq.impl.DSL
import org.jooq.DSLContext
import org.jooq.types.UInteger

import java.util
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{
DatasetVersion,
User
}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource.{context, _}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource._
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource.{context, _}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.`type`.DatasetFileNode
import io.dropwizard.auth.Auth
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import edu.uci.ics.amber.core.storage.StorageConfig
import edu.uci.ics.amber.engine.architecture.logreplay.{ReplayDestination, ReplayLogRecord}
import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage
import edu.uci.ics.amber.core.virtualidentity.{ChannelMarkerIdentity, ExecutionIdentity}
import edu.uci.ics.amber.engine.common.AmberConfig
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.web.auth.SessionUser
import edu.uci.ics.texera.dao.jooq.generated.Tables.{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName}
import com.fasterxml.jackson.databind.node.ObjectNode
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.DocumentFactory.MONGODB
import edu.uci.ics.amber.core.storage.VFSResourceType.{MATERIALIZED_RESULT, RESULT}
import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory}
import edu.uci.ics.amber.core.storage.result._
Expand Down Expand Up @@ -35,10 +35,8 @@ import edu.uci.ics.texera.web.model.websocket.event.{
WebResultUpdateEvent
}
import edu.uci.ics.texera.web.model.websocket.request.ResultPaginationRequest
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
import edu.uci.ics.texera.web.service.WorkflowExecutionService.getLatestExecutionId
import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore}
import org.jooq.types.UInteger

import java.util.UUID
import scala.collection.mutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import com.google.api.services.drive.model.{File, FileList, Permission}
import com.google.api.services.sheets.v4.Sheets
import com.google.api.services.sheets.v4.model.{Spreadsheet, SpreadsheetProperties, ValueRange}
import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.engine.common.Utils.retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,18 @@ import edu.uci.ics.amber.core.virtualidentity.{
ExecutionIdentity,
WorkflowIdentity
}
import edu.uci.ics.amber.core.workflow.WorkflowContext.DEFAULT_EXECUTION_ID
import edu.uci.ics.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE
import edu.uci.ics.amber.core.workflowruntimestate.WorkflowFatalError
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User
import edu.uci.ics.texera.web.model.websocket.event.TexeraWebSocketEvent
import edu.uci.ics.texera.web.model.websocket.request.WorkflowExecuteRequest
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
import edu.uci.ics.texera.web.service.WorkflowService.mkWorkflowStateId
import edu.uci.ics.texera.web.storage.ExecutionStateStore.updateWorkflowState
import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore}
import edu.uci.ics.texera.web.{SubscriptionManager, WorkflowLifecycleManager}
import edu.uci.ics.texera.workflow.LogicalPlan
import io.reactivex.rxjava3.disposables.{CompositeDisposable, Disposable}
import io.reactivex.rxjava3.subjects.BehaviorSubject
import org.jooq.types.UInteger
import play.api.libs.json.Json

import java.net.URI
Expand Down Expand Up @@ -101,6 +98,7 @@ class WorkflowService(
case _: Throwable => // exception can be raised if the document is already cleared
}
)
ExecutionResourcesMapping.removeExecutionResources(eid)
})
WorkflowService.workflowServiceMapping.remove(mkWorkflowStateId(workflowId))
if (executionService.getValue != null) {
Expand Down Expand Up @@ -185,6 +183,7 @@ class WorkflowService(
case _: Throwable =>
}
)
ExecutionResourcesMapping.removeExecutionResources(eid)
}) // TODO: change this behavior after enabling cache.

workflowContext.executionId = ExecutionsMetadataPersistService.insertNewExecution(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package edu.uci.ics.texera.workflow

import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT
import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory}
import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import ch.vorburger.mariadb4j.DB
import com.twitter.util.{Await, Duration, Promise}
import edu.uci.ics.amber.clustering.SingleNodeListener
import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package edu.uci.ics.amber.engine.common.storage

import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.storage.model.{ReadonlyLocalFileDocument, ReadonlyVirtualDocument}
import edu.uci.ics.amber.core.storage.model.ReadonlyVirtualDocument
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
package edu.uci.ics.amber.core.storage

import edu.uci.ics.amber.core.storage.FileResolver.DATASET_FILE_URI_SCHEME
import edu.uci.ics.amber.core.virtualidentity.{
ExecutionIdentity,
OperatorIdentity,
WorkflowIdentity
}
import edu.uci.ics.amber.core.workflow.PortIdentity
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.dao.SqlServer.withTransaction
import edu.uci.ics.texera.dao.jooq.generated.tables.Dataset.DATASET
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package edu.uci.ics.amber.core.storage.result

import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity

import java.net.URI
import scala.collection.mutable
Expand Down Expand Up @@ -43,4 +43,14 @@ object ExecutionResourcesMapping {
case None => Some(List(uri)) // Create a new list if key doesn't exist
}
}

/**
* Remove all resources associated with a given execution ID.
*
* @param executionIdentity the target execution ID
* @return true if the entry was removed, false if it did not exist
*/
def removeExecutionResources(executionIdentity: ExecutionIdentity): Boolean = {
executionIdToExecutionResourcesMapping.remove(executionIdentity).isDefined
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package edu.uci.ics.amber.storage.result.iceberg

import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT
import edu.uci.ics.amber.core.storage.{
DocumentFactory,
IcebergCatalogInstance,
StorageConfig,
VFSURIFactory
}
import edu.uci.ics.amber.core.storage.model.{VirtualDocument, VirtualDocumentSpec}
import edu.uci.ics.amber.core.storage.result.iceberg.IcebergDocument
import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple}
import edu.uci.ics.amber.core.virtualidentity.{
ExecutionIdentity,
Expand All @@ -20,7 +18,6 @@ import edu.uci.ics.amber.util.IcebergUtil
import org.apache.iceberg.catalog.Catalog
import org.apache.iceberg.data.Record
import org.apache.iceberg.{Schema => IcebergSchema}
import org.apache.iceberg.catalog.TableIdentifier
import org.scalatest.BeforeAndAfterAll

import java.net.URI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
import edu.uci.ics.amber.core.executor.OpExecWithClassName
import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType}
import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import edu.uci.ics.amber.core.workflow.{InputPort, OutputPort, PhysicalOp, SchemaPropagationFunc}
import edu.uci.ics.amber.operator.map.MapOpDesc
import edu.uci.ics.amber.operator.metadata.annotations.AutofillAttributeName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package edu.uci.ics.amber.operator.projection
import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import edu.uci.ics.amber.core.executor.OpExecWithClassName
import edu.uci.ics.amber.core.tuple.{Attribute, Schema}
import edu.uci.ics.amber.core.tuple.Schema
import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import edu.uci.ics.amber.core.workflow.PhysicalOp.oneToOnePhysicalOp
import edu.uci.ics.amber.core.workflow._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package edu.uci.ics.amber.operator.sink

import edu.uci.ics.amber.core.executor.SinkOperatorExecutor
import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.storage.model.{BufferedItemWriter, VirtualDocument}
import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
import edu.uci.ics.amber.core.storage.model.BufferedItemWriter
import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike}
import edu.uci.ics.amber.core.workflow.OutputPort.OutputMode
import edu.uci.ics.amber.core.workflow.PortIdentity
Expand Down
Loading