Skip to content

Commit

Permalink
Remove MemoryDocument from result storage (#3201)
Browse files Browse the repository at this point in the history
This PR removes the `MemoryDocument` class and its usage. Additionally,
it updates the fallback mechanism for `MongoDocument`, changing it from
`Memory` to `Iceberg`.
  • Loading branch information
shengquan-ni authored Jan 9, 2025
1 parent 7debf45 commit 630c59d
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with
val storageType = collection.get("storageType").asText()
val collectionName = collection.get("storageKey").asText()
storageType match {
case OpResultStorage.MEMORY | OpResultStorage.ICEBERG =>
case OpResultStorage.ICEBERG =>
// rely on the server-side result cleanup logic.
case OpResultStorage.MONGODB =>
MongoDatabaseManager.dropCollection(collectionName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._
import edu.uci.ics.texera.web.service.ExecutionsMetadataPersistService
import io.dropwizard.auth.Auth
import org.jooq.impl.DSL._
import org.jooq.types.{UInteger, ULong}

import java.net.URI
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala
*/
object OpResultStorage {
val defaultStorageMode: String = StorageConfig.resultStorageMode.toLowerCase
val MEMORY: String = "memory"
val MONGODB: String = "mongodb"
val ICEBERG = "iceberg"

Expand Down Expand Up @@ -115,9 +114,7 @@ class OpResultStorage extends Serializable with LazyLogging {
schema: Schema
): VirtualDocument[Tuple] = {
val storage: VirtualDocument[Tuple] =
if (mode == OpResultStorage.MEMORY) {
new MemoryDocument[Tuple](key)
} else if (mode == OpResultStorage.MONGODB) {
if (mode == OpResultStorage.MONGODB) {
try {
new MongoDocument[Tuple](
executionId + key,
Expand All @@ -127,22 +124,13 @@ class OpResultStorage extends Serializable with LazyLogging {
} catch {
case t: Throwable =>
logger.warn("Failed to create MongoDB storage", t)
logger.info(s"Falling back to memory storage for $key")
new MemoryDocument[Tuple](key)
logger.info(s"Falling back to Iceberg storage for $key")
createIcebergDocument(executionId, key, schema)
}
} else if (mode == OpResultStorage.ICEBERG) {
createIcebergDocument(executionId, key, schema)
} else {
val icebergSchema = IcebergUtil.toIcebergSchema(schema)
val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord
val deserde: (IcebergSchema, Record) => Tuple = (_, record) =>
IcebergUtil.fromRecord(record, schema)

new IcebergDocument[Tuple](
StorageConfig.icebergTableNamespace,
executionId + key,
icebergSchema,
serde,
deserde
)
throw new IllegalArgumentException(s"Unsupported storage mode: $mode")
}
cache.put(key, (storage, schema))
storage
Expand Down Expand Up @@ -172,4 +160,23 @@ class OpResultStorage extends Serializable with LazyLogging {
def getAllKeys: Set[String] = {
cache.keySet().iterator().asScala.toSet
}

private def createIcebergDocument(
executionId: String,
key: String,
schema: Schema
): IcebergDocument[Tuple] = {
val icebergSchema = IcebergUtil.toIcebergSchema(schema)
val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord
val deserde: (IcebergSchema, Record) => Tuple = (_, record) =>
IcebergUtil.fromRecord(record, schema)

new IcebergDocument[Tuple](
StorageConfig.icebergTableNamespace,
executionId + key,
icebergSchema,
serde,
deserde
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import edu.uci.ics.amber.core.storage.model.VirtualDocumentSpec
import edu.uci.ics.amber.core.storage.result.iceberg.IcebergDocument
import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple}
import edu.uci.ics.amber.util.IcebergUtil
import edu.uci.ics.texera.dao.MockTexeraDB
import org.apache.iceberg.catalog.Catalog
import org.apache.iceberg.data.Record
import org.apache.iceberg.{Schema => IcebergSchema}
Expand Down

0 comments on commit 630c59d

Please sign in to comment.