Skip to content

Commit

Permalink
Fix unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
Kunwoo Park committed Jan 9, 2025
1 parent 48fd2a5 commit 03a966d
Showing 1 changed file with 13 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,14 @@ class DefaultCostEstimatorSpec
operatorExecution
}

private def createOperatorRuntimeStatisticsEntry(
operatorExecutionId: ULong,
dataTime: Long,
controlTime: Long,
numWorkers: Int
): OperatorRuntimeStatistics = {
val operatorRuntimeStatistics = new OperatorRuntimeStatistics
operatorRuntimeStatistics.setOperatorExecutionId(operatorExecutionId)
operatorRuntimeStatistics.setDataProcessingTime(ULong.valueOf(dataTime))
operatorRuntimeStatistics.setControlProcessingTime(ULong.valueOf(controlTime))
operatorRuntimeStatistics.setNumWorkers(UInteger.valueOf(numWorkers))
operatorRuntimeStatistics
}

private val headerlessCsvOpRuntimeStatisticsEntry: OperatorRuntimeStatistics = {
val operatorRuntimeStatistics = new OperatorRuntimeStatistics
operatorRuntimeStatistics.setOperatorExecutionId(
headerlessCsvOperatorExecutionEntry.getOperatorExecutionId
)
operatorRuntimeStatistics.setDataProcessingTime(ULong.valueOf(100))
operatorRuntimeStatistics.setControlProcessingTime(ULong.valueOf(100))
operatorRuntimeStatistics.setNumWorkers(UInteger.valueOf(1))
operatorRuntimeStatistics
}

Expand All @@ -122,6 +109,7 @@ class DefaultCostEstimatorSpec
)
operatorRuntimeStatistics.setDataProcessingTime(ULong.valueOf(300))
operatorRuntimeStatistics.setControlProcessingTime(ULong.valueOf(300))
operatorRuntimeStatistics.setNumWorkers(UInteger.valueOf(1))
operatorRuntimeStatistics
}

Expand All @@ -132,6 +120,7 @@ class DefaultCostEstimatorSpec
)
operatorRuntimeStatistics.setDataProcessingTime(ULong.valueOf(1000))
operatorRuntimeStatistics.setControlProcessingTime(ULong.valueOf(1000))
operatorRuntimeStatistics.setNumWorkers(UInteger.valueOf(1))
operatorRuntimeStatistics
}

Expand Down Expand Up @@ -201,7 +190,6 @@ class DefaultCostEstimatorSpec
val workflowDao = new WorkflowDao(getDSLContext.configuration())
val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration())
val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration())
val operatorExecutionsDao = new OperatorExecutionsDao(getDSLContext.configuration())
val operatorRuntimeStatisticsDao =
new OperatorRuntimeStatisticsDao(getDSLContext.configuration())

Expand All @@ -210,16 +198,14 @@ class DefaultCostEstimatorSpec
workflowVersionDao.insert(testWorkflowVersionEntry)
workflowExecutionsDao.insert(testWorkflowExecutionEntry)

val headerlessCsvExecutionId =
insertOperatorExecutionAndGetId(getDSLContext, headerlessCsvOperatorExecutionEntry)
val keywordExecutionId =
insertOperatorExecutionAndGetId(getDSLContext, keywordOperatorExecutionEntry)
insertOperatorExecutionAndGetId(getDSLContext, headerlessCsvOperatorExecutionEntry)
insertOperatorExecutionAndGetId(getDSLContext, keywordOperatorExecutionEntry)

operatorRuntimeStatisticsDao.insert(
createOperatorRuntimeStatisticsEntry(headerlessCsvExecutionId, 100, 100, 1)
headerlessCsvOpRuntimeStatisticsEntry
)
operatorRuntimeStatisticsDao.insert(
createOperatorRuntimeStatisticsEntry(keywordExecutionId, 300, 300, 1)
keywordOpDescRuntimeStatisticsEntry
)

val costEstimator = new DefaultCostEstimator(
Expand Down Expand Up @@ -262,7 +248,6 @@ class DefaultCostEstimatorSpec
val workflowDao = new WorkflowDao(getDSLContext.configuration())
val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration())
val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration())
val operatorExecutionsDao = new OperatorExecutionsDao(getDSLContext.configuration())
val operatorRuntimeStatisticsDao =
new OperatorRuntimeStatisticsDao(getDSLContext.configuration())

Expand All @@ -271,21 +256,18 @@ class DefaultCostEstimatorSpec
workflowVersionDao.insert(testWorkflowVersionEntry)
workflowExecutionsDao.insert(testWorkflowExecutionEntry)

val headerlessCsvExecutionId =
insertOperatorExecutionAndGetId(getDSLContext, headerlessCsvOperatorExecutionEntry)
val groupByExecutionId =
insertOperatorExecutionAndGetId(getDSLContext, groupByOperatorExecutionEntry)
val keywordExecutionId =
insertOperatorExecutionAndGetId(getDSLContext, keywordOperatorExecutionEntry)
insertOperatorExecutionAndGetId(getDSLContext, headerlessCsvOperatorExecutionEntry)
insertOperatorExecutionAndGetId(getDSLContext, groupByOperatorExecutionEntry)
insertOperatorExecutionAndGetId(getDSLContext, keywordOperatorExecutionEntry)

operatorRuntimeStatisticsDao.insert(
createOperatorRuntimeStatisticsEntry(headerlessCsvExecutionId, 100, 100, 1)
headerlessCsvOpRuntimeStatisticsEntry
)
operatorRuntimeStatisticsDao.insert(
createOperatorRuntimeStatisticsEntry(groupByExecutionId, 1000, 1000, 1)
groupByOpDescRuntimeStatisticsEntry
)
operatorRuntimeStatisticsDao.insert(
createOperatorRuntimeStatisticsEntry(keywordExecutionId, 300, 300, 1)
keywordOpDescRuntimeStatisticsEntry
)

// Should contain two regions, one with CSV->localAgg->globalAgg, another with keyword->sink
Expand Down

0 comments on commit 03a966d

Please sign in to comment.