Skip to content

Commit

Permalink
Merge branch 'master' into yunyad-remove-redundancy
Browse files Browse the repository at this point in the history
  • Loading branch information
yunyad authored Jan 9, 2025
2 parents 4fe5da6 + 7debf45 commit 8ca0f0b
Show file tree
Hide file tree
Showing 24 changed files with 1,535 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ trait InitializeExecutorHandler {
case OpExecWithCode(code, _) => ExecFactory.newExecFromJavaCode(code)
case OpExecSink(storageKey, workflowIdentity, outputMode) =>
new ProgressiveSinkOpExec(
workerIdx,
outputMode,
storageKey,
workflowIdentity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with
val storageType = collection.get("storageType").asText()
val collectionName = collection.get("storageKey").asText()
storageType match {
case OpResultStorage.MEMORY =>
case OpResultStorage.MEMORY | OpResultStorage.ICEBERG =>
// rely on the server-side result cleanup logic.
case OpResultStorage.MONGODB =>
MongoDatabaseManager.dropCollection(collectionName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ object WorkflowExecutionsResource {
eId: UInteger,
vId: UInteger,
userName: String,
googleAvatar: String,
status: Byte,
result: String,
startingTime: Timestamp,
Expand Down Expand Up @@ -193,12 +194,8 @@ class WorkflowExecutionsResource {
.select(
WORKFLOW_EXECUTIONS.EID,
WORKFLOW_EXECUTIONS.VID,
field(
context
.select(USER.NAME)
.from(USER)
.where(WORKFLOW_EXECUTIONS.UID.eq(USER.UID))
),
USER.NAME,
USER.GOOGLE_AVATAR,
WORKFLOW_EXECUTIONS.STATUS,
WORKFLOW_EXECUTIONS.RESULT,
WORKFLOW_EXECUTIONS.STARTING_TIME,
Expand All @@ -210,6 +207,8 @@ class WorkflowExecutionsResource {
.from(WORKFLOW_EXECUTIONS)
.join(WORKFLOW_VERSION)
.on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID))
.join(USER)
.on(WORKFLOW_EXECUTIONS.UID.eq(USER.UID))
.where(WORKFLOW_VERSION.WID.eq(wid))
.fetchInto(classOf[WorkflowExecutionEntry])
.asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ class WorkflowCompiler(
val storageKey =
OpResultStorage.createStorageKey(physicalOp.id.logicalOpId, outputPortId)

// Determine the storage type, defaulting to memory for large HTML visualizations
// Determine the storage type, defaulting to iceberg for large HTML visualizations
val storageType =
if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY
if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.ICEBERG
else OpResultStorage.defaultStorageMode

if (!storage.contains(storageKey)) {
Expand Down
23 changes: 21 additions & 2 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,37 @@ lazy val WorkflowCore = (project in file("workflow-core"))
.dependsOn(DAO)
.configs(Test)
.dependsOn(DAO % "test->test") // test scope dependency
lazy val WorkflowOperator = (project in file("workflow-operator")).dependsOn(WorkflowCore)
lazy val WorkflowOperator = (project in file("workflow-operator"))
.dependsOn(WorkflowCore)
.settings(
dependencyOverrides ++= Seq(
"org.apache.commons" % "commons-compress" % "1.23.0", // because of the dependency introduced by iceberg
)
)
lazy val WorkflowCompilingService = (project in file("workflow-compiling-service"))
.dependsOn(WorkflowOperator)
.settings(
dependencyOverrides ++= Seq(
// override it as io.dropwizard 4 require 2.16.1 or higher
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1"
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.16.1",
"org.glassfish.jersey.core" % "jersey-common" % "3.0.12"
)
)

lazy val WorkflowExecutionService = (project in file("amber"))
.dependsOn(WorkflowOperator)
.settings(
dependencyOverrides ++= Seq(
"com.fasterxml.jackson.core" % "jackson-core" % "2.15.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.15.1",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.15.1",
"org.slf4j" % "slf4j-api" % "1.7.26",
"org.eclipse.jetty" % "jetty-server" % "9.4.20.v20190813",
"org.eclipse.jetty" % "jetty-servlet" % "9.4.20.v20190813",
"org.eclipse.jetty" % "jetty-http" % "9.4.20.v20190813",
)
)
.configs(Test)
.dependsOn(DAO % "test->test") // test scope dependency

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,10 @@
nzType="star"></i>
</td>
<td nzEllipsis>
<nz-avatar
[ngStyle]="{ 'background-color': setAvatarColor(row.userName) }"
[nzGap]="1"
[nzText]="abbreviate(row.userName || 'anonymous', false)"
nzSize="default"></nz-avatar>
<texera-user-avatar
[googleAvatar]="row.googleAvatar"
userColor="setAvatarColor(row.userName)"
[userName]="abbreviate(row.userName || 'anonymous', false)"></texera-user-avatar>
</td>
<td nzEllipsis>
<label
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export interface WorkflowExecutionsEntry {
vId: number;
sId: number;
userName: string;
googleAvatar: string;
name: string;
startingTime: number;
completionTime: number;
Expand Down
50 changes: 49 additions & 1 deletion core/workflow-core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,54 @@ val arrowDependencies = Seq(

libraryDependencies ++= arrowDependencies

/////////////////////////////////////////////////////////////////////////////
// Iceberg-related Dependencies
/////////////////////////////////////////////////////////////////////////////
val excludeJersey = ExclusionRule(organization = "com.sun.jersey")
val excludeGlassfishJersey = ExclusionRule(organization = "org.glassfish.jersey")
val excludeSlf4j = ExclusionRule(organization = "org.slf4j")
val excludeJetty = ExclusionRule(organization = "org.eclipse.jetty")
val excludeJsp = ExclusionRule(organization = "javax.servlet.jsp")
val excludeXmlBind = ExclusionRule(organization = "javax.xml.bind")
val excludeJackson = ExclusionRule(organization = "com.fasterxml.jackson.core")
val excludeJacksonModule = ExclusionRule(organization = "com.fasterxml.jackson.module")

libraryDependencies ++= Seq(
"org.apache.iceberg" % "iceberg-api" % "1.7.1",
"org.apache.iceberg" % "iceberg-parquet" % "1.7.1" excludeAll(
excludeJackson,
excludeJacksonModule
),
"org.apache.iceberg" % "iceberg-core" % "1.7.1" excludeAll(
excludeJackson,
excludeJacksonModule
),
"org.apache.iceberg" % "iceberg-data" % "1.7.1" excludeAll(
excludeJackson,
excludeJacksonModule
),
"org.apache.hadoop" % "hadoop-common" % "3.3.1" excludeAll(
excludeXmlBind,
excludeGlassfishJersey,
excludeJersey,
excludeSlf4j,
excludeJetty,
excludeJsp,
excludeJackson,
excludeJacksonModule
),
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % "3.3.1" excludeAll(
excludeXmlBind,
excludeGlassfishJersey,
excludeJersey,
excludeSlf4j,
excludeJetty,
excludeJsp,
excludeJackson,
excludeJacksonModule
),
)

/////////////////////////////////////////////////////////////////////////////
// Additional Dependencies
/////////////////////////////////////////////////////////////////////////////
Expand All @@ -123,5 +171,5 @@ libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", // Scala Logging
"org.eclipse.jgit" % "org.eclipse.jgit" % "5.13.0.202109080827-r", // jgit
"org.yaml" % "snakeyaml" % "1.30", // yaml reader (downgrade to 1.30 due to dropwizard 1.3.23 required by amber)
"org.apache.commons" % "commons-vfs2" % "2.9.0" // for FileResolver throw VFS-related exceptions
"org.apache.commons" % "commons-vfs2" % "2.9.0", // for FileResolver throw VFS-related exceptions
)
14 changes: 13 additions & 1 deletion core/workflow-core/src/main/resources/storage-config.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
storage:
result-storage-mode: memory
result-storage-mode: iceberg
mongodb:
url: "mongodb://localhost:27017"
database: "texera_storage"
commit-batch-size: 1000
iceberg:
table:
namespace: "operator-result"
commit:
batch-size: 4096 # decide the buffer size of our IcebergTableWriter
retry:
# retry configures the OCC parameter for concurrent write operations in Iceberg
# Docs about Reliability in Iceberg: https://iceberg.apache.org/docs/1.7.1/reliability/
# Docs about full parameter list and their meaning: https://iceberg.apache.org/docs/1.7.1/configuration/#write-properties
num-retries: 10
min-wait-ms: 100 # 0.1s
max-wait-ms: 10000 # 10s
jdbc:
url: "jdbc:mysql://localhost:3306/texera_db?serverTimezone=UTC"
username: ""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package edu.uci.ics.amber.core.storage

import edu.uci.ics.amber.util.IcebergUtil
import org.apache.iceberg.catalog.Catalog

/**
* IcebergCatalogInstance is a singleton that manages the Iceberg catalog instance.
* - Provides a single shared catalog for all Iceberg table-related operations in the Texera application.
* - Lazily initializes the catalog on first access.
* - Supports replacing the catalog instance primarily for testing or reconfiguration.
*/
object IcebergCatalogInstance {

private var instance: Option[Catalog] = None

/**
* Retrieves the singleton Iceberg catalog instance.
* - If the catalog is not initialized, it is lazily created using the configured properties.
* @return the Iceberg catalog instance.
*/
def getInstance(): Catalog = {
instance match {
case Some(catalog) => catalog
case None =>
val hadoopCatalog = IcebergUtil.createHadoopCatalog(
"texera_iceberg",
StorageConfig.fileStorageDirectoryPath
)
instance = Some(hadoopCatalog)
hadoopCatalog
}
}

/**
* Replaces the existing Iceberg catalog instance.
* - This method is useful for testing or dynamically updating the catalog.
*
* @param catalog the new Iceberg catalog instance to replace the current one.
*/
def replaceInstance(catalog: Catalog): Unit = {
instance = Some(catalog)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package edu.uci.ics.amber.core.storage

import edu.uci.ics.amber.util.PathUtils.corePath
import org.yaml.snakeyaml.Yaml

import java.nio.file.Path
import java.util.{Map => JMap}
import scala.jdk.CollectionConverters._

Expand All @@ -13,34 +15,94 @@ object StorageConfig {

val storageMap = javaConf("storage").asInstanceOf[JMap[String, Any]].asScala.toMap
val mongodbMap = storageMap("mongodb").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergMap = storageMap("iceberg").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergTableMap = icebergMap("table").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergCommitMap = icebergTableMap("commit").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergRetryMap = icebergCommitMap("retry").asInstanceOf[JMap[String, Any]].asScala.toMap
val jdbcMap = storageMap("jdbc").asInstanceOf[JMap[String, Any]].asScala.toMap
javaConf
.updated("storage", storageMap.updated("mongodb", mongodbMap).updated("jdbc", jdbcMap))

javaConf.updated(
"storage",
storageMap
.updated("mongodb", mongodbMap)
.updated(
"iceberg",
icebergMap
.updated(
"table",
icebergTableMap.updated(
"commit",
icebergCommitMap.updated("retry", icebergRetryMap)
)
)
)
.updated("jdbc", jdbcMap)
)
}

// Result storage mode
val resultStorageMode: String =
conf("storage").asInstanceOf[Map[String, Any]]("result-storage-mode").asInstanceOf[String]

// For MongoDB specifics
// MongoDB configurations
val mongodbUrl: String = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("url")
.asInstanceOf[String]

val mongodbDatabaseName: String = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("database")
.asInstanceOf[String]

val mongodbBatchSize: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("commit-batch-size")
.asInstanceOf[Int]

val icebergTableNamespace: String = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("namespace")
.asInstanceOf[String]

val icebergTableCommitBatchSize: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("batch-size")
.asInstanceOf[Int]

val icebergTableCommitNumRetries: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("num-retries")
.asInstanceOf[Int]

val icebergTableCommitMinRetryWaitMs: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("min-wait-ms")
.asInstanceOf[Int]

val icebergTableCommitMaxRetryWaitMs: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("max-wait-ms")
.asInstanceOf[Int]

// JDBC configurations
val jdbcUrl: String = conf("storage")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("url")
.asInstanceOf[String]

// For jdbc specifics
val jdbcUsername: String = conf("storage")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("username")
Expand All @@ -50,4 +112,8 @@ object StorageConfig {
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("password")
.asInstanceOf[String]

// File storage configurations
val fileStorageDirectoryPath: Path =
corePath.resolve("amber").resolve("user-resources").resolve("workflow-results")
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ abstract class VirtualDocument[T] extends ReadonlyVirtualDocument[T] {

/**
* return a writer that buffers the items and performs the flush operation at close time
* @param writerIdentifier the id of the writer, maybe required by some implementations
* @return a buffered item writer
*/
def writer(): BufferedItemWriter[T] =
def writer(writerIdentifier: String): BufferedItemWriter[T] =
throw new NotImplementedError("write method is not implemented")

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class MemoryDocument[T >: Null <: AnyRef](key: String)
results += item
}

override def writer(): BufferedItemWriter[T] = this
override def writer(writerIdentifier: String): BufferedItemWriter[T] = this

/**
* The size of the buffer for the buffered writer. This number is not used currently
Expand Down
Loading

0 comments on commit 8ca0f0b

Please sign in to comment.