Skip to content

Commit

Permalink
Get basic scheduler functionality running
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Dec 6, 2024
1 parent 2975992 commit c88a077
Show file tree
Hide file tree
Showing 39 changed files with 1,412 additions and 303 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package org.polypheny.db.workflow.dag;

import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.util.graph.AttributedDirectedGraph;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper;
import org.polypheny.db.workflow.dag.edges.DataEdge;
import org.polypheny.db.workflow.dag.edges.Edge;
import org.polypheny.db.workflow.dag.variables.ReadableVariableStore;
import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge;
import org.polypheny.db.workflow.models.EdgeModel;
import org.polypheny.db.workflow.models.WorkflowConfigModel;
Expand Down Expand Up @@ -67,6 +70,8 @@ public interface Workflow {

Edge getEdge( EdgeModel model );

Edge getEdge( ExecutionEdge execEdge );

DataEdge getDataEdge( UUID to, int toPort );

WorkflowConfigModel getConfig();
Expand All @@ -83,6 +88,31 @@ public interface Workflow {
*/
void setState( WorkflowState state );

/**
* Recomputes the variables of the specified activity based on the variables stored in its input activities
* and the edge state.
* The resulting variables might not yet be stable.
*
* @param activityId the activity whose variables will be recomputed
* @return a readable view of the recomputed variable store
*/
ReadableVariableStore recomputeInVariables( UUID activityId );

/**
* Returns true if all edges that could change the variables of the specified activity are
* not IDLE (except for ignored edges).
* If the specified activity is not yet executed, calling {@code recomputeInVariables()} more than once
* does not change its variables.
*
* @param activityId the activity whose variables will be recomputed
* @return true if the variables of the activities are stable.
*/
boolean hasStableInVariables( UUID activityId );

List<Optional<AlgDataType>> getInputTypes( UUID activityId );

int getInPortCount( UUID activityId );

void addActivity( ActivityWrapper activity );

void deleteActivity( UUID activityId );
Expand All @@ -91,6 +121,10 @@ public interface Workflow {

AttributedDirectedGraph<UUID, ExecutionEdge> toDag();

void validateStructure() throws Exception;

void validateStructure( AttributedDirectedGraph<UUID, ExecutionEdge> subDag ) throws IllegalStateException;


/**
* Returns a WorkflowModel of this workflow.
Expand All @@ -110,7 +144,8 @@ default WorkflowModel toModel( boolean includeState ) {

enum WorkflowState {
IDLE,
EXECUTING
EXECUTING,
INTERRUPTED
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,29 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import lombok.Setter;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.catalog.exceptions.GenericRuntimeException;
import org.polypheny.db.util.Pair;
import org.polypheny.db.util.graph.AttributedDirectedGraph;
import org.polypheny.db.util.graph.CycleDetector;
import org.polypheny.db.util.graph.TopologicalOrderIterator;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper;
import org.polypheny.db.workflow.dag.edges.DataEdge;
import org.polypheny.db.workflow.dag.edges.Edge;
import org.polypheny.db.workflow.dag.edges.Edge.EdgeState;
import org.polypheny.db.workflow.dag.variables.ReadableVariableStore;
import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge;
import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge.ExecutionEdgeFactory;
import org.polypheny.db.workflow.models.ActivityConfigModel.CommonType;
import org.polypheny.db.workflow.models.ActivityModel;
import org.polypheny.db.workflow.models.EdgeModel;
import org.polypheny.db.workflow.models.WorkflowConfigModel;
Expand Down Expand Up @@ -114,12 +123,14 @@ public List<Edge> getEdges( ActivityWrapper from, ActivityWrapper to ) {

@Override
public List<Edge> getInEdges( UUID target ) {
// TODO: make more efficient
return getEdges().stream().filter( e -> e.getTo().getId().equals( target ) ).toList();
}


@Override
public List<Edge> getOutEdges( UUID source ) {
// TODO: make more efficient
return getEdges().stream().filter( e -> e.getFrom().getId().equals( source ) ).toList();
}

Expand All @@ -136,6 +147,20 @@ public Edge getEdge( EdgeModel model ) {
}


@Override
public Edge getEdge( ExecutionEdge execEdge ) {
List<Edge> candidates = edges.get( Pair.of( execEdge.getSource(), execEdge.getTarget() ) );
if ( candidates != null ) {
for ( Edge candidate : candidates ) {
if ( execEdge.representsEdge( candidate ) ) {
return candidate;
}
}
}
return null;
}


@Override
public DataEdge getDataEdge( UUID to, int toPort ) {
for ( Edge edge : getInEdges( to ) ) {
Expand All @@ -155,6 +180,43 @@ public WorkflowConfigModel getConfig() {
}


@Override
public ReadableVariableStore recomputeInVariables( UUID activityId ) {
ActivityWrapper wrapper = activities.get( activityId );
wrapper.getVariables().mergeInputStores( getInEdges( activityId ), wrapper.getDef().getInPorts().length );
return wrapper.getVariables();
}


@Override
public boolean hasStableInVariables( UUID activityId ) {
for ( Edge edge : getInEdges( activityId ) ) {
if ( edge.getState() == EdgeState.IDLE && !edge.isIgnored() ) {
return false;
}
}
return true;
}


@Override
public List<Optional<AlgDataType>> getInputTypes( UUID activityId ) {
List<Optional<AlgDataType>> inputTypes = new ArrayList<>();

for ( int i = 0; i < getInPortCount( activityId ); i++ ) {
DataEdge dataEdge = getDataEdge( activityId, i );
inputTypes.add( dataEdge.getFrom().getOutTypePreview().get( dataEdge.getFromPort() ) );
}
return inputTypes;
}


@Override
public int getInPortCount( UUID activityId ) {
return activities.get( activityId ).getDef().getInPorts().length;
}


@Override
public void addActivity( ActivityWrapper activity ) {
if ( activities.containsKey( activity.getId() ) ) {
Expand Down Expand Up @@ -191,4 +253,78 @@ public AttributedDirectedGraph<UUID, ExecutionEdge> toDag() {
return dag;
}


@Override
public void validateStructure() throws Exception {
validateStructure( toDag() );
}


@Override
public void validateStructure( AttributedDirectedGraph<UUID, ExecutionEdge> subDag ) throws IllegalStateException {
if ( subDag.vertexSet().isEmpty() && subDag.edgeSet().isEmpty() ) {
return;
}

for ( ExecutionEdge execEdge : subDag.edgeSet() ) {
if ( !activities.containsKey( execEdge.getSource() ) || !activities.containsKey( execEdge.getTarget() ) ) {
throw new IllegalStateException( "Source and target activities of an edge must be part of the workflow: " + execEdge );
}
Edge edge = getEdge( execEdge );
if ( edge instanceof DataEdge data && !data.isCompatible() ) {
throw new IllegalStateException( "Incompatible port types for data edge: " + edge );
}
}

if ( !(new CycleDetector<>( subDag ).findCycles().isEmpty()) ) {
throw new IllegalStateException( "A workflow must not contain cycles" );
}

for ( UUID n : TopologicalOrderIterator.of( subDag ) ) {
ActivityWrapper wrapper = getActivity( n );
CommonType type = wrapper.getConfig().getCommonType();
Set<Integer> requiredInPorts = wrapper.getDef().getRequiredInPorts();
Set<Integer> occupiedInPorts = new HashSet<>();
for ( ExecutionEdge execEdge : subDag.getInwardEdges( n ) ) {
ActivityWrapper source = getActivity( execEdge.getSource() );
CommonType sourceType = source.getConfig().getCommonType();
int toPort = execEdge.getToPort();

requiredInPorts.remove( toPort );

if ( occupiedInPorts.contains( toPort ) ) {
throw new IllegalStateException( "InPort " + toPort + " is already occupied: " + execEdge );
}
occupiedInPorts.add( toPort );

if ( wrapper.getState().isExecuted() && !source.getState().isExecuted() ) {
throw new IllegalStateException( "An activity that is executed cannot have a not yet executed predecessor: " + execEdge );
}
if ( type == CommonType.EXTRACT ) {
if ( sourceType != CommonType.EXTRACT ) {
throw new IllegalStateException( "An activity with CommonType EXTRACT must only have EXTRACT predecessors: " + execEdge );
}
if ( execEdge.isControl() && !execEdge.isOnSuccess() ) {
throw new IllegalStateException( "Cannot have a onFail control edge between common EXTRACT activities" + execEdge );
}
} else if ( sourceType == CommonType.LOAD ) {
if ( type != CommonType.LOAD ) {
throw new IllegalStateException( "An activity with CommonType LOAD must only have LOAD successors: " + execEdge );
}
if ( execEdge.isControl() && !execEdge.isOnSuccess() ) {
throw new IllegalStateException( "Cannot have a onFail control edge between common LOAD activities" + execEdge );
}
}

}
if ( !requiredInPorts.isEmpty() ) {
throw new IllegalStateException( "Activity is missing the required data input(s) " + requiredInPorts + ": " + wrapper );
}
}

// compatible settings
// TODO: verify succesors of idle nodes are idle as well

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.polypheny.db.catalog.logistic.DataModel;
import org.polypheny.db.workflow.dag.edges.Edge.EdgeState;
import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue;
import org.polypheny.db.workflow.dag.variables.WritableVariableStore;
import org.polypheny.db.workflow.engine.execution.context.ExecutionContext;
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;

Expand All @@ -40,6 +39,7 @@ public interface Activity {
* If any available setting or input type results in a contradiction or invalid state,
* an {@link ActivityException} is thrown.
* If a setting, input or output type is available, it is guaranteed to not change anymore.
* This method should be idempotent.
*
* @param inTypes a list of {@link Optional<AlgDataType>} representing the input tuple types.
* @param settings a map of setting keys to {@link Optional<SettingValue>} representing the available settings, i.e. all settings that do not contain variables.
Expand All @@ -54,19 +54,6 @@ static List<Optional<AlgDataType>> wrapType( @Nullable AlgDataType type ) {
}


/**
* This method is called just before execution starts and can be used to write variables based on input tuple types and settings.
* To be able to update variables while having access to the input data, the activity should instead implement {@link VariableWriter}.
*
* @param inTypes a list of {@link AlgDataType} representing the input tuple types.
* @param settings a map of setting keys to {@link SettingValue} representing the settings.
* @param variables a WritableVariableStore to be used for updating any variable values.
*/
default void updateVariables( List<AlgDataType> inTypes, Map<String, SettingValue> settings, WritableVariableStore variables ) {
}

// settings do NOT include values from the updateVariables step.

/**
* Execute this activity.
* Any input CheckpointReaders are provided and expected to be closed by the caller.
Expand Down Expand Up @@ -152,7 +139,7 @@ enum DataStateMerger {
/**
* Computes whether an activity is NOT aborted based on its data edge states.
*
* @param dataEdges the EdgeState of all data inputs of an activity
* @param dataEdges the EdgeState of all data inputs of an activity in port order, null in case no edge is connected
* @return false if the data edge states result in an abort, true otherwise.
*/
public boolean merge( List<EdgeState> dataEdges ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.lang.annotation.Annotation;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.Value;
import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory;
import org.polypheny.db.workflow.dag.activities.Activity.PortType;
Expand Down Expand Up @@ -83,6 +85,18 @@ public PortType[] getOutPortTypes() {
}


@JsonIgnore
public Set<Integer> getRequiredInPorts() {
Set<Integer> set = new HashSet<>();
for ( int i = 0; i < inPorts.length; i++ ) {
if ( !inPorts[i].isOptional ) {
set.add( i );
}
}
return set;
}


public boolean hasCategory( ActivityCategory category ) {
if ( category == null ) {
return false;
Expand Down
Loading

0 comments on commit c88a077

Please sign in to comment.