Skip to content

Commit

Permalink
Add IcebergDocument as one type of the operator result storage (#3147)
Browse files Browse the repository at this point in the history
# Implement Apache Iceberg for Result Storage

<img width="556" alt="Screenshot 2025-01-06 at 3 18 19 PM"
src="https://github.com/user-attachments/assets/4edadb64-ee28-48ee-8d3c-1d1891d69d6a"
/>


## How to Enable Iceberg Result Storage
1. Update `storage-config.yaml`:
   - Set `result-storage-mode` to `iceberg`.

## Major Changes
- **Introduced `IcebergDocument`**: A thread-safe `VirtualDocument`
implementation for storing and reading results in Iceberg tables.
- **Introduced `IcebergTableWriter`**: Append-only writer for Iceberg
tables with configurable buffer size.
- **Catalog and Data storage for Iceberg**: Uses a local file system
(`file:/`) via `HadoopCatalog` and `HadoopFileIO`. This ensures Iceberg
operates without relying on external storage services.
- `ProgressiveSinkOpExec` with a new parameter `workerId` is added. Each
writer of the result storage will take this `workerId` as one new
parameter.

## Dependencies
- Added Apache Iceberg-related libraries.
- Introduced Hadoop-related libraries to support Iceberg's
`HadoopCatalog` and `HadoopFileIO`. These libraries are used for
placeholder configuration but do not enforce runtime dependency on HDFS.

## Overview of Iceberg Components
### `IcebergDocument`
- Manages reading and organizing data in Iceberg tables.
- Supports iterator-based incremental reads with thread-safe operations
for reading and clearing data.
- Initializes or overrides the Iceberg table during construction.

### `IcebergTableWriter`
- Writes data as immutable Parquet files in an append-only manner.
- Each writer uniquely prefixes its files to avoid conflicts
(`workerIndex_fileIndex` format).
- Not thread-safe—single-thread access is recommended.

## Data Storage via Iceberg Tables
- **Write**: 
  - Tables are created per `storage key`.
  - Writers append Parquet files to the table, ensuring immutability.
- **Read**:
  - Readers use `IcebergDocument.get` to fetch data via an iterator.
- The iterator reads data incrementally while ensuring data order
matches the commit sequence of the data files.

## Data Reading Using File Metadata
- Data files are read using `getUsingFileSequenceOrder`, which:
- Retrieves and sorts metadata files (`FileScanTask`) by sequence
numbers.
  - Reads records sequentially, skipping files or records as needed.
- Supports range-based reading (`from`, `until`) and incremental reads.
- Sorting ensures data consistency and order preservation.

## Hadoop Usage Without HDFS
- The `HadoopCatalog` uses an empty Hadoop configuration, defaulting to
the local file system (`file:/`).
- This enables efficient management of Iceberg tables in local or
network file systems without requiring HDFS infrastructure.

---------

Co-authored-by: Shengquan Ni <[email protected]>
  • Loading branch information
bobbai00 and shengquan-ni authored Jan 8, 2025
1 parent a1186f8 commit 7debf45
Show file tree
Hide file tree
Showing 21 changed files with 1,525 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ trait InitializeExecutorHandler {
case OpExecWithCode(code, _) => ExecFactory.newExecFromJavaCode(code)
case OpExecSink(storageKey, workflowIdentity, outputMode) =>
new ProgressiveSinkOpExec(
workerIdx,
outputMode,
storageKey,
workflowIdentity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with
val storageType = collection.get("storageType").asText()
val collectionName = collection.get("storageKey").asText()
storageType match {
case OpResultStorage.MEMORY =>
case OpResultStorage.MEMORY | OpResultStorage.ICEBERG =>
// rely on the server-side result cleanup logic.
case OpResultStorage.MONGODB =>
MongoDatabaseManager.dropCollection(collectionName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ class WorkflowCompiler(
val storageKey =
OpResultStorage.createStorageKey(physicalOp.id.logicalOpId, outputPortId)

// Determine the storage type, defaulting to memory for large HTML visualizations
// Determine the storage type, defaulting to iceberg for large HTML visualizations
val storageType =
if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY
if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.ICEBERG
else OpResultStorage.defaultStorageMode

if (!storage.contains(storageKey)) {
Expand Down
23 changes: 21 additions & 2 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,37 @@ lazy val WorkflowCore = (project in file("workflow-core"))
.dependsOn(DAO)
.configs(Test)
.dependsOn(DAO % "test->test") // test scope dependency
lazy val WorkflowOperator = (project in file("workflow-operator")).dependsOn(WorkflowCore)
lazy val WorkflowOperator = (project in file("workflow-operator"))
.dependsOn(WorkflowCore)
.settings(
dependencyOverrides ++= Seq(
"org.apache.commons" % "commons-compress" % "1.23.0", // because of the dependency introduced by iceberg
)
)
lazy val WorkflowCompilingService = (project in file("workflow-compiling-service"))
.dependsOn(WorkflowOperator)
.settings(
dependencyOverrides ++= Seq(
// override it as io.dropwizard 4 require 2.16.1 or higher
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1"
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.16.1",
"org.glassfish.jersey.core" % "jersey-common" % "3.0.12"
)
)

lazy val WorkflowExecutionService = (project in file("amber"))
.dependsOn(WorkflowOperator)
.settings(
dependencyOverrides ++= Seq(
"com.fasterxml.jackson.core" % "jackson-core" % "2.15.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.15.1",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.15.1",
"org.slf4j" % "slf4j-api" % "1.7.26",
"org.eclipse.jetty" % "jetty-server" % "9.4.20.v20190813",
"org.eclipse.jetty" % "jetty-servlet" % "9.4.20.v20190813",
"org.eclipse.jetty" % "jetty-http" % "9.4.20.v20190813",
)
)
.configs(Test)
.dependsOn(DAO % "test->test") // test scope dependency

Expand Down
50 changes: 49 additions & 1 deletion core/workflow-core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,54 @@ val arrowDependencies = Seq(

libraryDependencies ++= arrowDependencies

/////////////////////////////////////////////////////////////////////////////
// Iceberg-related Dependencies
/////////////////////////////////////////////////////////////////////////////
val excludeJersey = ExclusionRule(organization = "com.sun.jersey")
val excludeGlassfishJersey = ExclusionRule(organization = "org.glassfish.jersey")
val excludeSlf4j = ExclusionRule(organization = "org.slf4j")
val excludeJetty = ExclusionRule(organization = "org.eclipse.jetty")
val excludeJsp = ExclusionRule(organization = "javax.servlet.jsp")
val excludeXmlBind = ExclusionRule(organization = "javax.xml.bind")
val excludeJackson = ExclusionRule(organization = "com.fasterxml.jackson.core")
val excludeJacksonModule = ExclusionRule(organization = "com.fasterxml.jackson.module")

libraryDependencies ++= Seq(
"org.apache.iceberg" % "iceberg-api" % "1.7.1",
"org.apache.iceberg" % "iceberg-parquet" % "1.7.1" excludeAll(
excludeJackson,
excludeJacksonModule
),
"org.apache.iceberg" % "iceberg-core" % "1.7.1" excludeAll(
excludeJackson,
excludeJacksonModule
),
"org.apache.iceberg" % "iceberg-data" % "1.7.1" excludeAll(
excludeJackson,
excludeJacksonModule
),
"org.apache.hadoop" % "hadoop-common" % "3.3.1" excludeAll(
excludeXmlBind,
excludeGlassfishJersey,
excludeJersey,
excludeSlf4j,
excludeJetty,
excludeJsp,
excludeJackson,
excludeJacksonModule
),
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % "3.3.1" excludeAll(
excludeXmlBind,
excludeGlassfishJersey,
excludeJersey,
excludeSlf4j,
excludeJetty,
excludeJsp,
excludeJackson,
excludeJacksonModule
),
)

/////////////////////////////////////////////////////////////////////////////
// Additional Dependencies
/////////////////////////////////////////////////////////////////////////////
Expand All @@ -123,5 +171,5 @@ libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", // Scala Logging
"org.eclipse.jgit" % "org.eclipse.jgit" % "5.13.0.202109080827-r", // jgit
"org.yaml" % "snakeyaml" % "1.30", // yaml reader (downgrade to 1.30 due to dropwizard 1.3.23 required by amber)
"org.apache.commons" % "commons-vfs2" % "2.9.0" // for FileResolver throw VFS-related exceptions
"org.apache.commons" % "commons-vfs2" % "2.9.0", // for FileResolver throw VFS-related exceptions
)
14 changes: 13 additions & 1 deletion core/workflow-core/src/main/resources/storage-config.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
storage:
result-storage-mode: memory
result-storage-mode: iceberg
mongodb:
url: "mongodb://localhost:27017"
database: "texera_storage"
commit-batch-size: 1000
iceberg:
table:
namespace: "operator-result"
commit:
batch-size: 4096 # decide the buffer size of our IcebergTableWriter
retry:
# retry configures the OCC parameter for concurrent write operations in Iceberg
# Docs about Reliability in Iceberg: https://iceberg.apache.org/docs/1.7.1/reliability/
# Docs about full parameter list and their meaning: https://iceberg.apache.org/docs/1.7.1/configuration/#write-properties
num-retries: 10
min-wait-ms: 100 # 0.1s
max-wait-ms: 10000 # 10s
jdbc:
url: "jdbc:mysql://localhost:3306/texera_db?serverTimezone=UTC"
username: ""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package edu.uci.ics.amber.core.storage

import edu.uci.ics.amber.util.IcebergUtil
import org.apache.iceberg.catalog.Catalog

/**
* IcebergCatalogInstance is a singleton that manages the Iceberg catalog instance.
* - Provides a single shared catalog for all Iceberg table-related operations in the Texera application.
* - Lazily initializes the catalog on first access.
* - Supports replacing the catalog instance primarily for testing or reconfiguration.
*/
object IcebergCatalogInstance {

private var instance: Option[Catalog] = None

/**
* Retrieves the singleton Iceberg catalog instance.
* - If the catalog is not initialized, it is lazily created using the configured properties.
* @return the Iceberg catalog instance.
*/
def getInstance(): Catalog = {
instance match {
case Some(catalog) => catalog
case None =>
val hadoopCatalog = IcebergUtil.createHadoopCatalog(
"texera_iceberg",
StorageConfig.fileStorageDirectoryPath
)
instance = Some(hadoopCatalog)
hadoopCatalog
}
}

/**
* Replaces the existing Iceberg catalog instance.
* - This method is useful for testing or dynamically updating the catalog.
*
* @param catalog the new Iceberg catalog instance to replace the current one.
*/
def replaceInstance(catalog: Catalog): Unit = {
instance = Some(catalog)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package edu.uci.ics.amber.core.storage

import edu.uci.ics.amber.util.PathUtils.corePath
import org.yaml.snakeyaml.Yaml

import java.nio.file.Path
import java.util.{Map => JMap}
import scala.jdk.CollectionConverters._

Expand All @@ -13,34 +15,94 @@ object StorageConfig {

val storageMap = javaConf("storage").asInstanceOf[JMap[String, Any]].asScala.toMap
val mongodbMap = storageMap("mongodb").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergMap = storageMap("iceberg").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergTableMap = icebergMap("table").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergCommitMap = icebergTableMap("commit").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergRetryMap = icebergCommitMap("retry").asInstanceOf[JMap[String, Any]].asScala.toMap
val jdbcMap = storageMap("jdbc").asInstanceOf[JMap[String, Any]].asScala.toMap
javaConf
.updated("storage", storageMap.updated("mongodb", mongodbMap).updated("jdbc", jdbcMap))

javaConf.updated(
"storage",
storageMap
.updated("mongodb", mongodbMap)
.updated(
"iceberg",
icebergMap
.updated(
"table",
icebergTableMap.updated(
"commit",
icebergCommitMap.updated("retry", icebergRetryMap)
)
)
)
.updated("jdbc", jdbcMap)
)
}

// Result storage mode
val resultStorageMode: String =
conf("storage").asInstanceOf[Map[String, Any]]("result-storage-mode").asInstanceOf[String]

// For MongoDB specifics
// MongoDB configurations
val mongodbUrl: String = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("url")
.asInstanceOf[String]

val mongodbDatabaseName: String = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("database")
.asInstanceOf[String]

val mongodbBatchSize: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("commit-batch-size")
.asInstanceOf[Int]

val icebergTableNamespace: String = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("namespace")
.asInstanceOf[String]

val icebergTableCommitBatchSize: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("batch-size")
.asInstanceOf[Int]

val icebergTableCommitNumRetries: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("num-retries")
.asInstanceOf[Int]

val icebergTableCommitMinRetryWaitMs: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("min-wait-ms")
.asInstanceOf[Int]

val icebergTableCommitMaxRetryWaitMs: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("max-wait-ms")
.asInstanceOf[Int]

// JDBC configurations
val jdbcUrl: String = conf("storage")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("url")
.asInstanceOf[String]

// For jdbc specifics
val jdbcUsername: String = conf("storage")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("username")
Expand All @@ -50,4 +112,8 @@ object StorageConfig {
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("password")
.asInstanceOf[String]

// File storage configurations
val fileStorageDirectoryPath: Path =
corePath.resolve("amber").resolve("user-resources").resolve("workflow-results")
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ abstract class VirtualDocument[T] extends ReadonlyVirtualDocument[T] {

/**
* return a writer that buffers the items and performs the flush operation at close time
* @param writerIdentifier the id of the writer, maybe required by some implementations
* @return a buffered item writer
*/
def writer(): BufferedItemWriter[T] =
def writer(writerIdentifier: String): BufferedItemWriter[T] =
throw new NotImplementedError("write method is not implemented")

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class MemoryDocument[T >: Null <: AnyRef](key: String)
results += item
}

override def writer(): BufferedItemWriter[T] = this
override def writer(writerIdentifier: String): BufferedItemWriter[T] = this

/**
* The size of the buffer for the buffered writer. This number is not used currently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class MongoDocument[T >: Null <: AnyRef](
* Return a buffered item writer for the MongoDB collection.
* @return a new instance of MongoDBBufferedItemWriter.
*/
override def writer(): BufferedItemWriter[T] = {
override def writer(writerIdentifier: String): BufferedItemWriter[T] = {
new MongoDBBufferedItemWriter[T](
commitBatchSize,
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package edu.uci.ics.amber.core.storage.result
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.StorageConfig
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.storage.result.iceberg.IcebergDocument
import edu.uci.ics.amber.core.tuple.{Schema, Tuple}
import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity
import edu.uci.ics.amber.core.workflow.PortIdentity
import edu.uci.ics.amber.util.IcebergUtil
import org.apache.iceberg.data.Record
import org.apache.iceberg.{Schema => IcebergSchema}

import java.util.concurrent.ConcurrentHashMap
import scala.jdk.CollectionConverters.IteratorHasAsScala
Expand All @@ -18,6 +22,7 @@ object OpResultStorage {
val defaultStorageMode: String = StorageConfig.resultStorageMode.toLowerCase
val MEMORY: String = "memory"
val MONGODB: String = "mongodb"
val ICEBERG = "iceberg"

/**
* Creates a unique storage key by combining operator and port identities.
Expand Down Expand Up @@ -112,7 +117,7 @@ class OpResultStorage extends Serializable with LazyLogging {
val storage: VirtualDocument[Tuple] =
if (mode == OpResultStorage.MEMORY) {
new MemoryDocument[Tuple](key)
} else {
} else if (mode == OpResultStorage.MONGODB) {
try {
new MongoDocument[Tuple](
executionId + key,
Expand All @@ -125,6 +130,19 @@ class OpResultStorage extends Serializable with LazyLogging {
logger.info(s"Falling back to memory storage for $key")
new MemoryDocument[Tuple](key)
}
} else {
val icebergSchema = IcebergUtil.toIcebergSchema(schema)
val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord
val deserde: (IcebergSchema, Record) => Tuple = (_, record) =>
IcebergUtil.fromRecord(record, schema)

new IcebergDocument[Tuple](
StorageConfig.icebergTableNamespace,
executionId + key,
icebergSchema,
serde,
deserde
)
}
cache.put(key, (storage, schema))
storage
Expand Down
Loading

0 comments on commit 7debf45

Please sign in to comment.