Skip to content

Commit

Permalink
Merge branch 'master' into xinyuan-if-statement
Browse files Browse the repository at this point in the history
  • Loading branch information
aglinxinyuan authored Jan 13, 2025
2 parents 1186777 + d4de38f commit 4af0002
Show file tree
Hide file tree
Showing 30 changed files with 1,585 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ trait InitializeExecutorHandler {
case OpExecWithCode(code, _) => ExecFactory.newExecFromJavaCode(code)
case OpExecSink(storageKey, workflowIdentity, outputMode) =>
new ProgressiveSinkOpExec(
workerIdx,
outputMode,
storageKey,
workflowIdentity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with
val storageType = collection.get("storageType").asText()
val collectionName = collection.get("storageKey").asText()
storageType match {
case OpResultStorage.MEMORY =>
case OpResultStorage.ICEBERG =>
// rely on the server-side result cleanup logic.
case OpResultStorage.MONGODB =>
MongoDatabaseManager.dropCollection(collectionName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import edu.uci.ics.texera.web.resource.dashboard.user.dataset.{
DatasetAccessResource,
DatasetResource
}
import edu.uci.ics.texera.web.resource.dashboard.user.discussion.UserDiscussionResource
import edu.uci.ics.texera.web.resource.dashboard.user.project.{
ProjectAccessResource,
ProjectResource,
Expand Down Expand Up @@ -147,7 +146,6 @@ class TexeraWebApplication
environment.jersey.register(classOf[GmailResource])
environment.jersey.register(classOf[AdminExecutionResource])
environment.jersey.register(classOf[UserQuotaResource])
environment.jersey.register(classOf[UserDiscussionResource])
environment.jersey.register(classOf[AIAssistantResource])
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._
import edu.uci.ics.texera.web.service.ExecutionsMetadataPersistService
import io.dropwizard.auth.Auth
import org.jooq.impl.DSL._
import org.jooq.types.{UInteger, ULong}

import java.net.URI
Expand Down Expand Up @@ -96,6 +95,7 @@ object WorkflowExecutionsResource {
eId: UInteger,
vId: UInteger,
userName: String,
googleAvatar: String,
status: Byte,
result: String,
startingTime: Timestamp,
Expand Down Expand Up @@ -193,12 +193,8 @@ class WorkflowExecutionsResource {
.select(
WORKFLOW_EXECUTIONS.EID,
WORKFLOW_EXECUTIONS.VID,
field(
context
.select(USER.NAME)
.from(USER)
.where(WORKFLOW_EXECUTIONS.UID.eq(USER.UID))
),
USER.NAME,
USER.GOOGLE_AVATAR,
WORKFLOW_EXECUTIONS.STATUS,
WORKFLOW_EXECUTIONS.RESULT,
WORKFLOW_EXECUTIONS.STARTING_TIME,
Expand All @@ -210,6 +206,8 @@ class WorkflowExecutionsResource {
.from(WORKFLOW_EXECUTIONS)
.join(WORKFLOW_VERSION)
.on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID))
.join(USER)
.on(WORKFLOW_EXECUTIONS.UID.eq(USER.UID))
.where(WORKFLOW_VERSION.WID.eq(wid))
.fetchInto(classOf[WorkflowExecutionEntry])
.asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ class WorkflowCompiler(
val storageKey =
OpResultStorage.createStorageKey(physicalOp.id.logicalOpId, outputPortId)

// Determine the storage type, defaulting to memory for large HTML visualizations
// Determine the storage type, defaulting to iceberg for large HTML visualizations
val storageType =
if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY
if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.ICEBERG
else OpResultStorage.defaultStorageMode

if (!storage.contains(storageKey)) {
Expand Down
23 changes: 21 additions & 2 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,37 @@ lazy val WorkflowCore = (project in file("workflow-core"))
.dependsOn(DAO)
.configs(Test)
.dependsOn(DAO % "test->test") // test scope dependency
lazy val WorkflowOperator = (project in file("workflow-operator")).dependsOn(WorkflowCore)
lazy val WorkflowOperator = (project in file("workflow-operator"))
.dependsOn(WorkflowCore)
.settings(
dependencyOverrides ++= Seq(
"org.apache.commons" % "commons-compress" % "1.23.0", // because of the dependency introduced by iceberg
)
)
lazy val WorkflowCompilingService = (project in file("workflow-compiling-service"))
.dependsOn(WorkflowOperator)
.settings(
dependencyOverrides ++= Seq(
// override it as io.dropwizard 4 require 2.16.1 or higher
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1"
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.16.1",
"org.glassfish.jersey.core" % "jersey-common" % "3.0.12"
)
)

lazy val WorkflowExecutionService = (project in file("amber"))
.dependsOn(WorkflowOperator)
.settings(
dependencyOverrides ++= Seq(
"com.fasterxml.jackson.core" % "jackson-core" % "2.15.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.15.1",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.15.1",
"org.slf4j" % "slf4j-api" % "1.7.26",
"org.eclipse.jetty" % "jetty-server" % "9.4.20.v20190813",
"org.eclipse.jetty" % "jetty-servlet" % "9.4.20.v20190813",
"org.eclipse.jetty" % "jetty-http" % "9.4.20.v20190813",
)
)
.configs(Test)
.dependsOn(DAO % "test->test") // test scope dependency

Expand Down
1 change: 1 addition & 0 deletions core/gui/src/app/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ registerLocaleData(en);
tokenGetter: AuthService.getAccessToken,
skipWhenExpired: false,
throwNoTokenError: false,
disallowedRoutes: ["forum/api/users"],
},
}),
BrowserAnimationsModule,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class WorkflowPersistService {
name: workflow.name,
description: workflow.description,
content: JSON.stringify(workflow.content),
isPublished: workflow.isPublished,
isPublic: workflow.isPublished,
})
.pipe(
filter((updatedWorkflow: Workflow) => updatedWorkflow != null),
Expand Down
44 changes: 24 additions & 20 deletions core/gui/src/app/dashboard/component/dashboard.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,6 @@ export class DashboardComponent implements OnInit {

this.isCollpased = false;

if (!document.cookie.includes("flarum_remember") && this.isLogin) {
this.flarumService
.auth()
.pipe(untilDestroyed(this))
.subscribe({
next: (response: any) => {
document.cookie = `flarum_remember=${response.token};path=/`;
},
error: (err: unknown) => {
if ([404, 500].includes((err as HttpErrorResponse).status)) {
this.displayForum = false;
} else {
this.flarumService
.register()
.pipe(untilDestroyed(this))
.subscribe(() => this.ngOnInit());
}
},
});
}
this.router.events.pipe(untilDestroyed(this)).subscribe(() => {
this.checkRoute();
});
Expand All @@ -94,11 +74,35 @@ export class DashboardComponent implements OnInit {
this.ngZone.run(() => {
this.isLogin = this.userService.isLogin();
this.isAdmin = this.userService.isAdmin();
this.forumLogin();
this.cdr.detectChanges();
});
});
}

forumLogin() {
if (!document.cookie.includes("flarum_remember") && this.isLogin) {
this.flarumService
.auth()
.pipe(untilDestroyed(this))
.subscribe({
next: (response: any) => {
document.cookie = `flarum_remember=${response.token};path=/`;
},
error: (err: unknown) => {
if ([404, 500].includes((err as HttpErrorResponse).status)) {
this.displayForum = false;
} else {
this.flarumService
.register()
.pipe(untilDestroyed(this))
.subscribe(() => this.forumLogin());
}
},
});
}
}

checkRoute() {
const currentRoute = this.router.url;
this.displayNavbar = this.isNavbarEnabled(currentRoute);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,10 @@
nzType="star"></i>
</td>
<td nzEllipsis>
<nz-avatar
[ngStyle]="{ 'background-color': setAvatarColor(row.userName) }"
[nzGap]="1"
[nzText]="abbreviate(row.userName || 'anonymous', false)"
nzSize="default"></nz-avatar>
<texera-user-avatar
[googleAvatar]="row.googleAvatar"
userColor="setAvatarColor(row.userName)"
[userName]="abbreviate(row.userName || 'anonymous', false)"></texera-user-avatar>
</td>
<td nzEllipsis>
<label
Expand Down
23 changes: 14 additions & 9 deletions core/gui/src/app/dashboard/service/user/flarum/flarum.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Injectable } from "@angular/core";
import { HttpClient } from "@angular/common/http";
import { UserService } from "../../../../common/service/user/user.service";
import { AppSettings } from "../../../../common/app-setting";

@Injectable({
providedIn: "root",
Expand All @@ -11,16 +10,22 @@ export class FlarumService {
private http: HttpClient,
private userService: UserService
) {}
public register() {
return this.http.put(`${AppSettings.getApiEndpoint()}/discussion/register`, {});
}

auth() {
const currentUser = this.userService.getCurrentUser();
register() {
const user = this.userService.getCurrentUser();
return this.http.post(
"forum/api/token",
{ identification: currentUser!.email, password: currentUser!.googleId, remember: "1" },
{ headers: { "Content-Type": "application/json" }, withCredentials: true }
"forum/api/users",
{
data: {
attributes: { username: user!.email.split("@")[0] + user!.uid, email: user!.email, password: user!.googleId },
},
},
{ headers: { Authorization: "Token hdebsyxiigyklxgsqivyswwiisohzlnezzzzzzzz;userId=1" } }
);
}

auth() {
const user = this.userService.getCurrentUser();
return this.http.post("forum/api/token", { identification: user!.email, password: user!.googleId, remember: "1" });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export interface WorkflowExecutionsEntry {
vId: number;
sId: number;
userName: string;
googleAvatar: string;
name: string;
startingTime: number;
completionTime: number;
Expand Down
50 changes: 49 additions & 1 deletion core/workflow-core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,54 @@ val arrowDependencies = Seq(

libraryDependencies ++= arrowDependencies

/////////////////////////////////////////////////////////////////////////////
// Iceberg-related Dependencies
/////////////////////////////////////////////////////////////////////////////
val excludeJersey = ExclusionRule(organization = "com.sun.jersey")
val excludeGlassfishJersey = ExclusionRule(organization = "org.glassfish.jersey")
val excludeSlf4j = ExclusionRule(organization = "org.slf4j")
val excludeJetty = ExclusionRule(organization = "org.eclipse.jetty")
val excludeJsp = ExclusionRule(organization = "javax.servlet.jsp")
val excludeXmlBind = ExclusionRule(organization = "javax.xml.bind")
val excludeJackson = ExclusionRule(organization = "com.fasterxml.jackson.core")
val excludeJacksonModule = ExclusionRule(organization = "com.fasterxml.jackson.module")

libraryDependencies ++= Seq(
"org.apache.iceberg" % "iceberg-api" % "1.7.1",
"org.apache.iceberg" % "iceberg-parquet" % "1.7.1" excludeAll(
excludeJackson,
excludeJacksonModule
),
"org.apache.iceberg" % "iceberg-core" % "1.7.1" excludeAll(
excludeJackson,
excludeJacksonModule
),
"org.apache.iceberg" % "iceberg-data" % "1.7.1" excludeAll(
excludeJackson,
excludeJacksonModule
),
"org.apache.hadoop" % "hadoop-common" % "3.3.1" excludeAll(
excludeXmlBind,
excludeGlassfishJersey,
excludeJersey,
excludeSlf4j,
excludeJetty,
excludeJsp,
excludeJackson,
excludeJacksonModule
),
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % "3.3.1" excludeAll(
excludeXmlBind,
excludeGlassfishJersey,
excludeJersey,
excludeSlf4j,
excludeJetty,
excludeJsp,
excludeJackson,
excludeJacksonModule
),
)

/////////////////////////////////////////////////////////////////////////////
// Additional Dependencies
/////////////////////////////////////////////////////////////////////////////
Expand All @@ -123,5 +171,5 @@ libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", // Scala Logging
"org.eclipse.jgit" % "org.eclipse.jgit" % "5.13.0.202109080827-r", // jgit
"org.yaml" % "snakeyaml" % "1.30", // yaml reader (downgrade to 1.30 due to dropwizard 1.3.23 required by amber)
"org.apache.commons" % "commons-vfs2" % "2.9.0" // for FileResolver throw VFS-related exceptions
"org.apache.commons" % "commons-vfs2" % "2.9.0", // for FileResolver throw VFS-related exceptions
)
14 changes: 13 additions & 1 deletion core/workflow-core/src/main/resources/storage-config.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
storage:
result-storage-mode: memory
result-storage-mode: iceberg
mongodb:
url: "mongodb://localhost:27017"
database: "texera_storage"
commit-batch-size: 1000
iceberg:
table:
namespace: "operator-result"
commit:
batch-size: 4096 # decide the buffer size of our IcebergTableWriter
retry:
# retry configures the OCC parameter for concurrent write operations in Iceberg
# Docs about Reliability in Iceberg: https://iceberg.apache.org/docs/1.7.1/reliability/
# Docs about full parameter list and their meaning: https://iceberg.apache.org/docs/1.7.1/configuration/#write-properties
num-retries: 10
min-wait-ms: 100 # 0.1s
max-wait-ms: 10000 # 10s
jdbc:
url: "jdbc:mysql://localhost:3306/texera_db?serverTimezone=UTC"
username: ""
Expand Down
Loading

0 comments on commit 4af0002

Please sign in to comment.