Skip to content

Commit

Permalink
fix test and add sanity check
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbai00 committed Jan 12, 2025
1 parent 5ba5af1 commit 7d406eb
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import akka.actor.{ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit}
import akka.util.Timeout
import edu.uci.ics.amber.clustering.SingleNodeListener
import edu.uci.ics.amber.core.storage.result.OpResultStorage
import edu.uci.ics.amber.core.workflow.{WorkflowContext, WorkflowSettings}
import edu.uci.ics.amber.engine.architecture.controller._
import edu.uci.ics.amber.engine.architecture.sendsemantics.partitionings._
Expand All @@ -28,15 +27,12 @@ class BatchSizePropagationSpec

implicit val timeout: Timeout = Timeout(5.seconds)

val resultStorage = new OpResultStorage()

override def beforeAll(): Unit = {
system.actorOf(Props[SingleNodeListener](), "cluster-info")
}

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
resultStorage.clear()
}

def verifyBatchSizeInPartitioning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import akka.util.Timeout
import ch.vorburger.mariadb4j.DB
import com.twitter.util.{Await, Duration, Promise}
import edu.uci.ics.amber.clustering.SingleNodeListener
import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ExecutionResourcesMapping}
import edu.uci.ics.amber.core.storage.{DocumentFactory, FileResolver}
import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple}
import edu.uci.ics.amber.core.workflow.WorkflowContext
import edu.uci.ics.amber.engine.architecture.controller._
Expand Down Expand Up @@ -38,8 +41,6 @@ class DataProcessingSpec

var inMemoryMySQLInstance: Option[DB] = None
val workflowContext: WorkflowContext = new WorkflowContext()
val resultStorage: OpResultStorage =
ExecutionResourcesMapping.getResourceURIs(workflowContext.workflowId)

override def beforeAll(): Unit = {
system.actorOf(Props[SingleNodeListener](), "cluster-info")
Expand All @@ -48,7 +49,6 @@ class DataProcessingSpec

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
resultStorage.clear()
}

def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] = {
Expand All @@ -70,16 +70,34 @@ class DataProcessingSpec
.registerCallback[ExecutionStateUpdate](evt => {
if (evt.state == COMPLETED) {
results = workflow.logicalPlan.getTerminalOperatorIds
.filter(terminalOpId =>
.filter(terminalOpId => {
val uri = FileResolver.resolve(
RESULT,
workflowContext.workflowId,
workflowContext.executionId,
terminalOpId,
Some(PortIdentity())
)
// expecting the first output port only.
resultStorage.contains(OpResultStorage.createStorageKey(terminalOpId, PortIdentity()))
)
.map(terminalOpId =>
terminalOpId -> resultStorage
.get(OpResultStorage.createStorageKey(terminalOpId, PortIdentity()))
ExecutionResourcesMapping
.getResourceURIs(workflowContext.executionId)
.contains(uri)
})
.map(terminalOpId => {
val uri = FileResolver.resolve(
RESULT,
workflowContext.workflowId,
workflowContext.executionId,
terminalOpId,
Some(PortIdentity())
)
terminalOpId -> DocumentFactory
.openDocument(uri)
._1
.asInstanceOf[VirtualDocument[Tuple]]
.get()
.toList
)
})
.toMap
completion.setDone()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import akka.util.Timeout
import com.twitter.util.{Await, Promise}
import com.typesafe.scalalogging.Logger
import edu.uci.ics.amber.clustering.SingleNodeListener
import edu.uci.ics.amber.core.storage.result.OpResultStorage
import edu.uci.ics.amber.core.workflow.WorkflowContext
import edu.uci.ics.amber.engine.architecture.controller.{ControllerConfig, ExecutionStateUpdate}
import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest
Expand Down Expand Up @@ -45,7 +44,6 @@ class PauseSpec
operators: List[LogicalOp],
links: List[LogicalLink]
): Unit = {
val resultStorage = new OpResultStorage()
val workflow = TestUtils.buildWorkflow(operators, links, new WorkflowContext())
val client =
new AmberClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package edu.uci.ics.amber.engine.faulttolerance
import akka.actor.{ActorSystem, Props}
import akka.serialization.SerializationExtension
import edu.uci.ics.amber.clustering.SingleNodeListener
import edu.uci.ics.amber.core.storage.result.OpResultStorage
import edu.uci.ics.amber.core.workflow.WorkflowContext
import edu.uci.ics.amber.engine.architecture.controller.{ControllerConfig, ControllerProcessor}
import edu.uci.ics.amber.engine.architecture.worker.DataProcessor
Expand All @@ -21,7 +20,6 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll {

var system: ActorSystem = _

val resultStorage = new OpResultStorage()
val csvOpDesc = TestOperators.mediumCsvScanOpDesc()
val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia")
val workflow = buildWorkflow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@ package edu.uci.ics.amber.operator.source.cache
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.executor.SourceOperatorExecutor
import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.storage.FileResolver.decodeVFSUri
import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike}

import java.net.URI

class CacheSourceOpExec(storageUri: URI) extends SourceOperatorExecutor with LazyLogging {
val (_, _, _, _, resourceType) = decodeVFSUri(storageUri)
if (resourceType != MATERIALIZED_RESULT) {
throw new RuntimeException("The storage URI must point to an materialized result storage")
}

private val storage =
DocumentFactory.openDocument(storageUri)._1.asInstanceOf[VirtualDocument[Tuple]]

Expand Down

0 comments on commit 7d406eb

Please sign in to comment.