Skip to content

Commit

Permalink
Merge branch 'master' into yunyad-remove-redundancy
Browse files Browse the repository at this point in the history
  • Loading branch information
yunyad authored Jan 15, 2025
2 parents c4fbcfc + 9b9c200 commit f0d3709
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 1 deletion.
Binary file added core/gui/src/assets/operator_images/If.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import edu.uci.ics.amber.operator.huggingFace.{
HuggingFaceSpamSMSDetectionOpDesc,
HuggingFaceTextSummarizationOpDesc
}
import edu.uci.ics.amber.operator.ifStatement.IfOpDesc
import edu.uci.ics.amber.operator.intersect.IntersectOpDesc
import edu.uci.ics.amber.operator.intervalJoin.IntervalJoinOpDesc
import edu.uci.ics.amber.operator.keywordSearch.KeywordSearchOpDesc
Expand Down Expand Up @@ -116,6 +117,7 @@ trait StateTransferFunc
)
@JsonSubTypes(
Array(
new Type(value = classOf[IfOpDesc], name = "If"),
new Type(value = classOf[SankeyDiagramOpDesc], name = "SankeyDiagram"),
new Type(value = classOf[IcicleChartOpDesc], name = "IcicleChart"),
new Type(value = classOf[CSVScanSourceOpDesc], name = "CSVFileScan"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package edu.uci.ics.amber.operator.ifStatement

import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import edu.uci.ics.amber.core.executor.OpExecWithClassName
import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import edu.uci.ics.amber.core.workflow.{
InputPort,
OutputPort,
PhysicalOp,
PortIdentity,
SchemaPropagationFunc
}
import edu.uci.ics.amber.operator.LogicalOp
import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo}
import edu.uci.ics.amber.util.JSONUtils.objectMapper

class IfOpDesc extends LogicalOp {
@JsonProperty(required = true)
@JsonSchemaTitle("Condition State")
@JsonPropertyDescription("name of the state variable to evaluate")
var conditionName: String = _

override def getPhysicalOp(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
): PhysicalOp = {
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecWithClassName(
"edu.uci.ics.amber.operator.ifStatement.IfOpExec",
objectMapper.writeValueAsString(this)
)
)
.withInputPorts(operatorInfo.inputPorts)
.withOutputPorts(operatorInfo.outputPorts)
.withParallelizable(false)
.withPropagateSchema(
SchemaPropagationFunc(inputSchemas =>
operatorInfo.outputPorts
.map(_.id)
.map(id => id -> inputSchemas(operatorInfo.inputPorts.last.id))
.toMap
)
)
}

override def operatorInfo: OperatorInfo =
OperatorInfo(
"If",
"If",
OperatorGroupConstants.CONTROL_GROUP,
inputPorts = List(
InputPort(PortIdentity(), "Condition"),
InputPort(PortIdentity(1), dependencies = List(PortIdentity()))
),
outputPorts = List(OutputPort(PortIdentity(), "False"), OutputPort(PortIdentity(1), "True"))
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package edu.uci.ics.amber.operator.ifStatement

import edu.uci.ics.amber.core.executor.OperatorExecutor
import edu.uci.ics.amber.core.marker.State
import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike}
import edu.uci.ics.amber.core.workflow.PortIdentity
import edu.uci.ics.amber.util.JSONUtils.objectMapper

class IfOpExec(descString: String) extends OperatorExecutor {
private val desc: IfOpDesc = objectMapper.readValue(descString, classOf[IfOpDesc])
private var outputPort: PortIdentity = PortIdentity(1) // by default, it should be the true port.

//This function can handle one or more states.
//The state can have mutiple key-value pairs. Keys are not identified by conditionName will be ignored.
//It can accept any value that can be converted to a boolean. For example, Int 1 will be converted to true.
override def processState(state: State, port: Int): Option[State] = {
outputPort =
if (state.get(desc.conditionName).asInstanceOf[Boolean]) PortIdentity(1) else PortIdentity()
Some(state)
}

override def processTupleMultiPort(
tuple: Tuple,
port: Int
): Iterator[(TupleLike, Option[PortIdentity])] =
Iterator((tuple, Some(outputPort)))

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ object OperatorGroupConstants {
final val JAVA_GROUP = "Java"
final val R_GROUP = "R"
final val MACHINE_LEARNING_GENERAL_GROUP = "Machine Learning General"
final val CONTROL_GROUP = "Control Block"

/**
* The order of the groups to show up in the frontend operator panel.
Expand All @@ -46,6 +47,7 @@ object OperatorGroupConstants {
GroupInfo(UTILITY_GROUP),
GroupInfo(API_GROUP),
GroupInfo(UDF_GROUP, List(GroupInfo(PYTHON_GROUP), GroupInfo(JAVA_GROUP), GroupInfo(R_GROUP))),
GroupInfo(VISUALIZATION_GROUP)
GroupInfo(VISUALIZATION_GROUP),
GroupInfo(CONTROL_GROUP)
)
}

0 comments on commit f0d3709

Please sign in to comment.