diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala index e0d82451c5..92911a1fc5 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala @@ -179,7 +179,7 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with val storageType = collection.get("storageType").asText() val collectionName = collection.get("storageKey").asText() storageType match { - case OpResultStorage.MEMORY | OpResultStorage.ICEBERG => + case OpResultStorage.ICEBERG => // rely on the server-side result cleanup logic. case OpResultStorage.MONGODB => MongoDatabaseManager.dropCollection(collectionName) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 7b51e853fd..d3311ea403 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -23,7 +23,6 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._ import edu.uci.ics.texera.web.service.ExecutionsMetadataPersistService import io.dropwizard.auth.Auth -import org.jooq.impl.DSL._ import org.jooq.types.{UInteger, ULong} import java.net.URI diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MemoryDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MemoryDocument.scala deleted file mode 100644 index b508bb461c..0000000000 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MemoryDocument.scala +++ /dev/null @@ -1,107 +0,0 @@ -package edu.uci.ics.amber.core.storage.result - -import edu.uci.ics.amber.core.storage.model.{BufferedItemWriter, VirtualDocument} - -import java.net.URI -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -/** - * Global storage mapping for all MemoryDocuments. - * This ensures that 2 MemoryDocuments opened on the same key will operate on the same ArrayBuffer. - */ -object MemoryDocument { - val storageMapping: mutable.Map[String, ArrayBuffer[AnyRef]] = - mutable.HashMap[String, ArrayBuffer[AnyRef]]() -} - -/** - * MemoryDocument provides an in-memory implementation of VirtualDocument and BufferedItemWriter. - * It stores items in a mutable ArrayBuffer and supports various operations like read, write, and remove. - * @tparam T the type of data items stored in the document. - */ -class MemoryDocument[T >: Null <: AnyRef](key: String) - extends VirtualDocument[T] - with BufferedItemWriter[T] { - - /** - * Internal storage for the document's items, retrieved from the shared mapping. - */ - private def results: ArrayBuffer[T] = { - if (!MemoryDocument.storageMapping.contains(key)) { - MemoryDocument.storageMapping(key) = new ArrayBuffer[AnyRef]() - } - MemoryDocument.storageMapping(key).asInstanceOf[ArrayBuffer[T]] - } - - override def getURI: URI = - throw new UnsupportedOperationException("getURI is not supported for MemoryDocument") - - override def clear(): Unit = - synchronized { - results.clear() - } - - override def get(): Iterator[T] = - synchronized { - results.to(Iterator) - } - - override def getItem(i: Int): T = - synchronized { - results.apply(i) - } - - override def getRange(from: Int, until: Int): Iterator[T] = - synchronized { - results.slice(from, until).to(Iterator) - } - - override def getAfter(offset: Int): Iterator[T] = - synchronized { - results.slice(offset, results.size).to(Iterator) - } - - override def getCount: Long = results.length - - override def append(item: T): Unit = - synchronized { - results += item - } - - override def writer(writerIdentifier: String): BufferedItemWriter[T] = this - - /** - * The size of the buffer for the buffered writer. This number is not used currently - */ - override val bufferSize: Int = 1024 - - /** - * Open the writer. This method does nothing for MemoryDocument. - */ - override def open(): Unit = {} - - /** - * Close the writer. This method does nothing for MemoryDocument. - */ - override def close(): Unit = {} - - /** - * Put one item into the buffer. For MemoryDocument, it simply adds the item to the internal storage. - * @param item the data item to be written. - */ - override def putOne(item: T): Unit = - synchronized { - results += item - } - - /** - * Remove one item from the buffer. For MemoryDocument, it removes the item from the internal storage. - * - * @param item the item to remove. - */ - override def removeOne(item: T): Unit = - synchronized { - results -= item - } -} diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala index eb29595016..52e916be33 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala @@ -20,7 +20,6 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala */ object OpResultStorage { val defaultStorageMode: String = StorageConfig.resultStorageMode.toLowerCase - val MEMORY: String = "memory" val MONGODB: String = "mongodb" val ICEBERG = "iceberg" @@ -115,9 +114,7 @@ class OpResultStorage extends Serializable with LazyLogging { schema: Schema ): VirtualDocument[Tuple] = { val storage: VirtualDocument[Tuple] = - if (mode == OpResultStorage.MEMORY) { - new MemoryDocument[Tuple](key) - } else if (mode == OpResultStorage.MONGODB) { + if (mode == OpResultStorage.MONGODB) { try { new MongoDocument[Tuple]( executionId + key, @@ -127,22 +124,13 @@ class OpResultStorage extends Serializable with LazyLogging { } catch { case t: Throwable => logger.warn("Failed to create MongoDB storage", t) - logger.info(s"Falling back to memory storage for $key") - new MemoryDocument[Tuple](key) + logger.info(s"Falling back to Iceberg storage for $key") + createIcebergDocument(executionId, key, schema) } + } else if (mode == OpResultStorage.ICEBERG) { + createIcebergDocument(executionId, key, schema) } else { - val icebergSchema = IcebergUtil.toIcebergSchema(schema) - val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord - val deserde: (IcebergSchema, Record) => Tuple = (_, record) => - IcebergUtil.fromRecord(record, schema) - - new IcebergDocument[Tuple]( - StorageConfig.icebergTableNamespace, - executionId + key, - icebergSchema, - serde, - deserde - ) + throw new IllegalArgumentException(s"Unsupported storage mode: $mode") } cache.put(key, (storage, schema)) storage @@ -172,4 +160,23 @@ class OpResultStorage extends Serializable with LazyLogging { def getAllKeys: Set[String] = { cache.keySet().iterator().asScala.toSet } + + private def createIcebergDocument( + executionId: String, + key: String, + schema: Schema + ): IcebergDocument[Tuple] = { + val icebergSchema = IcebergUtil.toIcebergSchema(schema) + val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord + val deserde: (IcebergSchema, Record) => Tuple = (_, record) => + IcebergUtil.fromRecord(record, schema) + + new IcebergDocument[Tuple]( + StorageConfig.icebergTableNamespace, + executionId + key, + icebergSchema, + serde, + deserde + ) + } } diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala index 3c2a22c689..250ffdb1ed 100644 --- a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -5,7 +5,6 @@ import edu.uci.ics.amber.core.storage.model.VirtualDocumentSpec import edu.uci.ics.amber.core.storage.result.iceberg.IcebergDocument import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} import edu.uci.ics.amber.util.IcebergUtil -import edu.uci.ics.texera.dao.MockTexeraDB import org.apache.iceberg.catalog.Catalog import org.apache.iceberg.data.Record import org.apache.iceberg.{Schema => IcebergSchema}