From 13cb52ed5ae5698b859df08204ff0594dd34816d Mon Sep 17 00:00:00 2001 From: Shengquan Ni <13672781+shengquan-ni@users.noreply.github.com> Date: Wed, 15 Jan 2025 19:27:04 -0800 Subject: [PATCH 1/2] Fix slow scan operators (#3215) This PR fixes the slow workflow execution issue by calling desc.sourceSchema only once in the scan operator executor's constructor, instead of for every output tuple. This PR also includes some reformatting to remove unused imports. --- .../amber/operator/source/scan/csv/CSVScanSourceOpExec.scala | 5 +++-- .../source/scan/csv/ParallelCSVScanSourceOpExec.scala | 2 +- .../operator/source/scan/json/JSONLScanSourceOpExec.scala | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala index df5953371e..eb99ae4f83 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala @@ -3,7 +3,7 @@ package edu.uci.ics.amber.operator.source.scan.csv import com.univocity.parsers.csv.{CsvFormat, CsvParser, CsvParserSettings} import edu.uci.ics.amber.core.executor.SourceOperatorExecutor import edu.uci.ics.amber.core.storage.DocumentFactory -import edu.uci.ics.amber.core.tuple.{AttributeTypeUtils, TupleLike} +import edu.uci.ics.amber.core.tuple.{AttributeTypeUtils, Schema, TupleLike} import edu.uci.ics.amber.util.JSONUtils.objectMapper import java.io.InputStreamReader @@ -16,6 +16,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat var parser: CsvParser = _ var nextRow: Array[String] = _ var numRowGenerated = 0 + private val schema: Schema = desc.sourceSchema() override def produceTuple(): Iterator[TupleLike] = { @@ -42,7 +43,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat try { TupleLike( ArraySeq.unsafeWrapArray( - AttributeTypeUtils.parseFields(row.asInstanceOf[Array[Any]], desc.sourceSchema()) + AttributeTypeUtils.parseFields(row.asInstanceOf[Array[Any]], schema) ): _* ) } catch { diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala index 108050c397..f17a7901f2 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala @@ -20,6 +20,7 @@ class ParallelCSVScanSourceOpExec private[csv] ( val desc: ParallelCSVScanSourceOpDesc = objectMapper.readValue(descString, classOf[ParallelCSVScanSourceOpDesc]) private var reader: BufferedBlockReader = _ + private val schema = desc.sourceSchema() override def produceTuple(): Iterator[TupleLike] = new Iterator[TupleLike]() { @@ -42,7 +43,6 @@ class ParallelCSVScanSourceOpExec private[csv] ( return null } - val schema = desc.sourceSchema() // however the null values won't present if omitted in the end, we need to match nulls. if (fields.length != schema.getAttributes.size) fields = Stream diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala index 9c4e9b01e0..bb13a06fdf 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala @@ -21,11 +21,11 @@ class JSONLScanSourceOpExec private[json] ( objectMapper.readValue(descString, classOf[JSONLScanSourceOpDesc]) private var rows: Iterator[String] = _ private var reader: BufferedReader = _ + private val schema = desc.sourceSchema() override def produceTuple(): Iterator[TupleLike] = { rows.flatMap { line => Try { - val schema = desc.sourceSchema() val data = JSONToMap(objectMapper.readTree(line), desc.flatten).withDefaultValue(null) val fields = schema.getAttributeNames.map { fieldName => parseField(data(fieldName), schema.getAttribute(fieldName).getType) From c5c07016a2c4dddf0a0ec4c0202043e4df041969 Mon Sep 17 00:00:00 2001 From: Zhe Yuan <86388854+MiuMiuMiue@users.noreply.github.com> Date: Wed, 15 Jan 2025 20:14:09 -0800 Subject: [PATCH 2/2] Execution Dashboard Backend Pagination & Frontend Loading Icon (#3105) This PR addresses the issue #3016 that execution dashboard takes long time to load. Changed made: - Add loading icon in the frontend while loading the execution info - Move the pagination to backend, including the sorting and filtering function. Loading Icon: ![execution_loading](https://github.com/user-attachments/assets/d7494b54-eb15-4361-b1cd-a70f276aa6f6) Page Size Change: ![newnewnew](https://github.com/user-attachments/assets/9e1da835-7b2c-4ca1-a02b-87b7b3694101) --------- Co-authored-by: Chris <143021053+kunwp1@users.noreply.github.com> --- .../execution/AdminExecutionResource.scala | 104 ++++++++-- .../execution/admin-execution.component.html | 60 ++++-- .../execution/admin-execution.component.scss | 12 ++ .../execution/admin-execution.component.ts | 179 +++++++----------- .../execution/admin-execution.service.ts | 20 +- 5 files changed, 239 insertions(+), 136 deletions(-) create mode 100644 core/gui/src/app/dashboard/component/admin/execution/admin-execution.component.scss diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/admin/execution/AdminExecutionResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/admin/execution/AdminExecutionResource.scala index 5bdb31437e..9a280f921f 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/admin/execution/AdminExecutionResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/admin/execution/AdminExecutionResource.scala @@ -6,12 +6,13 @@ import edu.uci.ics.texera.web.auth.SessionUser import edu.uci.ics.texera.dao.jooq.generated.Tables._ import edu.uci.ics.texera.web.resource.dashboard.admin.execution.AdminExecutionResource._ import io.dropwizard.auth.Auth +import org.jooq.impl.DSL import org.jooq.types.UInteger import javax.annotation.security.RolesAllowed import javax.ws.rs._ import javax.ws.rs.core.MediaType -import scala.jdk.CollectionConverters.IterableHasAsScala +import scala.jdk.CollectionConverters._ /** * This file handles various request related to saved-executions. @@ -48,6 +49,25 @@ object AdminExecutionResource { } } + def mapToStatus(status: String): Int = { + status match { + case "READY" => 0 + case "RUNNING" => 1 + case "PAUSED" => 2 + case "COMPLETED" => 3 + case "FAILED" => 4 + case "KILLED" => 5 + case _ => -1 // or throw an exception, depends on your needs + } + } + + val sortFieldMapping = Map( + "workflow_name" -> WORKFLOW.NAME, + "execution_name" -> WORKFLOW_EXECUTIONS.NAME, + "initiator" -> USER.NAME, + "end_time" -> WORKFLOW_EXECUTIONS.LAST_UPDATE_TIME + ) + } @Produces(Array(MediaType.APPLICATION_JSON)) @@ -55,42 +75,104 @@ object AdminExecutionResource { @RolesAllowed(Array("ADMIN")) class AdminExecutionResource { + @GET + @Path("/totalWorkflow") + @Produces() + def getTotalWorkflows: Int = { + context + .select( + DSL.countDistinct(WORKFLOW.WID) + ) + .from(WORKFLOW_EXECUTIONS) + .join(WORKFLOW_VERSION) + .on(WORKFLOW_EXECUTIONS.VID.eq(WORKFLOW_VERSION.VID)) + .join(USER) + .on(WORKFLOW_EXECUTIONS.UID.eq(USER.UID)) + .join(WORKFLOW) + .on(WORKFLOW.WID.eq(WORKFLOW_VERSION.WID)) + .fetchOne(0, classOf[Int]) + } + /** - * This method retrieves all existing executions + * This method retrieves latest execution of each workflow for specified page. + * The returned executions are sorted and filtered according to the parameters. */ @GET - @Path("/executionList") + @Path("/executionList/{pageSize}/{pageIndex}/{sortField}/{sortDirection}") @Produces(Array(MediaType.APPLICATION_JSON)) - def listWorkflows(@Auth current_user: SessionUser): List[dashboardExecution] = { - val workflowEntries = context + def listWorkflows( + @Auth current_user: SessionUser, + @PathParam("pageSize") page_size: Int = 20, + @PathParam("pageIndex") page_index: Int = 0, + @PathParam("sortField") sortField: String = "end_time", + @PathParam("sortDirection") sortDirection: String = "desc", + @QueryParam("filter") filter: java.util.List[String] + ): List[dashboardExecution] = { + val filter_status = filter.asScala.map(mapToStatus).toSeq.filter(_ != -1).asJava + + // Base query that retrieves latest execution info for each workflow without sorting and filtering. + // Only retrieving executions in current page according to pageSize and pageIndex parameters. + val executions_base_query = context .select( WORKFLOW_EXECUTIONS.UID, USER.NAME, WORKFLOW_VERSION.WID, WORKFLOW.NAME, WORKFLOW_EXECUTIONS.EID, - WORKFLOW_EXECUTIONS.VID, WORKFLOW_EXECUTIONS.STARTING_TIME, WORKFLOW_EXECUTIONS.LAST_UPDATE_TIME, WORKFLOW_EXECUTIONS.STATUS, WORKFLOW_EXECUTIONS.NAME ) .from(WORKFLOW_EXECUTIONS) - .leftJoin(WORKFLOW_VERSION) + .join(WORKFLOW_VERSION) .on(WORKFLOW_EXECUTIONS.VID.eq(WORKFLOW_VERSION.VID)) - .leftJoin(USER) + .join(USER) .on(WORKFLOW_EXECUTIONS.UID.eq(USER.UID)) - .leftJoin(WORKFLOW) + .join(WORKFLOW) .on(WORKFLOW.WID.eq(WORKFLOW_VERSION.WID)) - .fetch() + .naturalJoin( + context + .select( + DSL.max(WORKFLOW_EXECUTIONS.EID).as("eid") + ) + .from(WORKFLOW_EXECUTIONS) + .join(WORKFLOW_VERSION) + .on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID)) + .groupBy(WORKFLOW_VERSION.WID) + ) + + // Apply filter if the status are not empty. + val executions_apply_filter = if (!filter_status.isEmpty) { + executions_base_query.where(WORKFLOW_EXECUTIONS.STATUS.in(filter_status)) + } else { + executions_base_query + } + + // Apply sorting if user specified. + var executions_apply_order = + executions_apply_filter.limit(page_size).offset(page_index * page_size) + if (sortField != "NO_SORTING") { + executions_apply_order = executions_apply_filter + .orderBy( + if (sortDirection == "desc") sortFieldMapping.getOrElse(sortField, WORKFLOW.NAME).desc() + else sortFieldMapping.getOrElse(sortField, WORKFLOW.NAME).asc() + ) + .limit(page_size) + .offset(page_index * page_size) + } + + val executions = executions_apply_order.fetch() + // Retrieve the id of each workflow that the user has access to. val availableWorkflowIds = context .select(WORKFLOW_USER_ACCESS.WID) .from(WORKFLOW_USER_ACCESS) .where(WORKFLOW_USER_ACCESS.UID.eq(current_user.getUid)) .fetchInto(classOf[UInteger]) - workflowEntries + // Calculate the statistics needed for each execution. + executions .map(workflowRecord => { val startingTime = workflowRecord.get(WORKFLOW_EXECUTIONS.STARTING_TIME).getTime diff --git a/core/gui/src/app/dashboard/component/admin/execution/admin-execution.component.html b/core/gui/src/app/dashboard/component/admin/execution/admin-execution.component.html index 115b5b339a..66ebf1dbfc 100644 --- a/core/gui/src/app/dashboard/component/admin/execution/admin-execution.component.html +++ b/core/gui/src/app/dashboard/component/admin/execution/admin-execution.component.html @@ -10,24 +10,42 @@ + nzShowSizeChanger + [nzScroll]="{y: '500px'}" + [nzData]="listOfExecutions" + [nzLoading]="isLoading" + [nzLoadingIndicator]="loadingTemplate" + [nzTemplateMode]="true" + [nzFrontPagination]="false" + [nzTotal]="totalWorkflows" + [nzPageSize]="pageSize" + [nzPageIndex]="currentPageIndex + 1" + [nzPageSizeOptions]="[5, 10, 20, 50]" + (nzQueryParams)="onQueryParamsChange($event)" + class="execution-table"> Workflow (ID) Execution Name (ID) Initiator @@ -42,25 +60,28 @@ { text: 'KILLED', value: 'KILLED'}, { text: 'JUST COMPLETED', value: 'JUST COMPLETED'}, { text: 'UNKNOWN', value: 'UNKNOWN'}]" - [nzFilterFn]="filterByStatus" - nzWidth="16%"> + [nzFilterFn]="true" + (nzFilterChange)="onFilterChange($event)" + nzWidth="13%"> Status Time Used (hh:mm:ss) + [nzShowSort]="true" + [nzSortFn]="true" + [nzSortDirections]="['ascend', 'descend', null]" + (nzSortOrderChange)="onSortChange('end_time', $event)" + nzWidth="20%"> End Time - Action + Action
- + {{ maxStringLength(execution.workflowName, 16) }} ({{ execution.workflowId }})
@@ -126,7 +147,7 @@ nzType="redo">