Skip to content

Commit

Permalink
Get activity variables working
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Dec 11, 2024
1 parent d8a0ee5 commit 3224ca0
Show file tree
Hide file tree
Showing 24 changed files with 490 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
import java.util.UUID;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.util.graph.AttributedDirectedGraph;
import org.polypheny.db.workflow.dag.activities.ActivityException;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper;
import org.polypheny.db.workflow.dag.edges.DataEdge;
import org.polypheny.db.workflow.dag.edges.Edge;
import org.polypheny.db.workflow.dag.variables.ReadableVariableStore;
import org.polypheny.db.workflow.dag.variables.VariableStore;
import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge;
import org.polypheny.db.workflow.engine.storage.StorageManager;
import org.polypheny.db.workflow.models.EdgeModel;
Expand Down Expand Up @@ -79,6 +80,8 @@ public interface Workflow {

WorkflowState getState();

VariableStore getVariables();

/**
* Sets the state of this workflow to the specified value.
* A workflow should not change its state by itself (after initialization).
Expand All @@ -89,15 +92,34 @@ public interface Workflow {
*/
void setState( WorkflowState state );

/**
* Updates the inTypePreview, outTypePreview and settingsPreview of the specified activity,
* if it is not already executed.
* To get consistent results, updates should be called in topological order.
* If the update resulted in an invalid state, the outTypePreview and settingsPreview is not updated.
*
* @param activityId the activity whose previews will be updated
*/
void updatePreview( UUID activityId );

/**
* Updates the inTypePreview, outTypePreview and settingsPreview of the specified activity,
* if it is not already executed.
* To get consistent results, updates should be called in topological order.
* If the update resulted in an invalid state, an exception is thrown.
*
* @param activityId the activity whose previews will be updated
*/
void updateValidPreview( UUID activityId ) throws ActivityException;

/**
* Recomputes the variables of the specified activity based on the variables stored in its input activities
* and the edge state.
* The resulting variables might not yet be stable.
*
* @param activityId the activity whose variables will be recomputed
* @return a readable view of the recomputed variable store
*/
ReadableVariableStore recomputeInVariables( UUID activityId );
void recomputeInVariables( UUID activityId );

/**
* Returns true if all edges that could change the variables of the specified activity are
Expand Down Expand Up @@ -147,7 +169,7 @@ default WorkflowModel toModel( boolean includeState ) {
WorkflowState state = includeState ? getState() : null;
return new WorkflowModel( getActivities().stream().map( a -> a.toModel( includeState ) ).toList(),
getEdges().stream().map( e -> e.toModel( includeState ) ).toList(),
getConfig(), state );
getConfig(), getVariables().getVariables(), state );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.polypheny.db.workflow.dag;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -33,12 +34,14 @@
import org.polypheny.db.util.graph.AttributedDirectedGraph;
import org.polypheny.db.util.graph.CycleDetector;
import org.polypheny.db.util.graph.TopologicalOrderIterator;
import org.polypheny.db.workflow.dag.activities.ActivityException;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper.ActivityState;
import org.polypheny.db.workflow.dag.edges.DataEdge;
import org.polypheny.db.workflow.dag.edges.Edge;
import org.polypheny.db.workflow.dag.edges.Edge.EdgeState;
import org.polypheny.db.workflow.dag.variables.ReadableVariableStore;
import org.polypheny.db.workflow.dag.settings.SettingDef.SettingsPreview;
import org.polypheny.db.workflow.dag.variables.VariableStore;
import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge;
import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge.ExecutionEdgeFactory;
import org.polypheny.db.workflow.engine.storage.StorageManager;
Expand All @@ -52,23 +55,28 @@ public class WorkflowImpl implements Workflow {

private final Map<UUID, ActivityWrapper> activities;
private final Map<Pair<UUID, UUID>, List<Edge>> edges;
@Getter
private final WorkflowConfigModel config;
@Getter
@Setter
private WorkflowState state = WorkflowState.IDLE;
@Getter
private final VariableStore variables = new VariableStore(); // contains "static" variables (= defined before execution starts)


public WorkflowImpl() {
this( new ConcurrentHashMap<>(), new ConcurrentHashMap<>(), WorkflowConfigModel.of() );
this( new ConcurrentHashMap<>(), new ConcurrentHashMap<>(), WorkflowConfigModel.of(), Map.of() );
}


private WorkflowImpl( Map<UUID, ActivityWrapper> activities, Map<Pair<UUID, UUID>, List<Edge>> edges, WorkflowConfigModel config ) {
private WorkflowImpl( Map<UUID, ActivityWrapper> activities, Map<Pair<UUID, UUID>, List<Edge>> edges, WorkflowConfigModel config, Map<String, JsonNode> variables ) {
this.activities = activities;
this.edges = edges;
this.config = config;
this.variables.reset( variables );

// TODO: compute previews & variables
TopologicalOrderIterator.of( toDag() ).forEach( this::updatePreview );
}


Expand All @@ -86,7 +94,7 @@ public static Workflow fromModel( WorkflowModel model ) {
edgeList.add( Edge.fromModel( e, activities ) );
}

return new WorkflowImpl( activities, edges, model.getConfig() );
return new WorkflowImpl( activities, edges, model.getConfig(), model.getVariables() );
}


Expand Down Expand Up @@ -177,16 +185,46 @@ public DataEdge getDataEdge( UUID to, int toPort ) {


@Override
public WorkflowConfigModel getConfig() {
return config;
public void updatePreview( UUID activityId ) {
try {
updatePreview( activityId, false );
} catch ( ActivityException ignored ) {
assert false;
}
}


@Override
public void updateValidPreview( UUID activityId ) throws ActivityException {
updatePreview( activityId, true );
}


private void updatePreview( UUID activityId, boolean throwIfInvalid ) throws ActivityException {
ActivityWrapper wrapper = getActivity( activityId );
if ( wrapper.getState().isExecuted() ) {
return; // when an activity can be executed, it's previews won't change anymore
}
recomputeInVariables( activityId );
List<Optional<AlgDataType>> inTypes = getInputTypes( activityId );
wrapper.setInTypePreview( inTypes );
try {
SettingsPreview settings = wrapper.updateOutTypePreview( inTypes, hasStableInVariables( activityId ) );
wrapper.setSettingsPreview( settings );
} catch ( ActivityException e ) {
if ( throwIfInvalid ) {
throw e;
} else {
e.printStackTrace(); // TODO: make sure ignoring inconsistency is okay
}
}
}


@Override
public ReadableVariableStore recomputeInVariables( UUID activityId ) {
public void recomputeInVariables( UUID activityId ) {
ActivityWrapper wrapper = activities.get( activityId );
wrapper.getVariables().mergeInputStores( getInEdges( activityId ), wrapper.getDef().getInPorts().length );
return wrapper.getVariables();
wrapper.getVariables().mergeInputStores( getInEdges( activityId ), wrapper.getDef().getInPorts().length, variables );
}


Expand All @@ -207,13 +245,15 @@ public List<Optional<AlgDataType>> getInputTypes( UUID activityId ) {

for ( int i = 0; i < getInPortCount( activityId ); i++ ) {
DataEdge dataEdge = getDataEdge( activityId, i );
if ( dataEdge.getState() == EdgeState.INACTIVE ) {
if ( dataEdge == null ) {
inputTypes.add( Optional.empty() ); // not yet connected
} else if ( dataEdge.getState() == EdgeState.INACTIVE ) {
inputTypes.add( null );
} else {
inputTypes.add( dataEdge.getFrom().getOutTypePreview().get( dataEdge.getFromPort() ) );
}
}
return inputTypes;
return Collections.unmodifiableList( inputTypes );
}


Expand All @@ -229,6 +269,7 @@ public void addActivity( ActivityWrapper activity ) {
throw new GenericRuntimeException( "Cannot add activity instance that is already part of this workflow." );
}
activities.put( activity.getId(), activity );
updatePreview( activity.getId() ); // creates empty previews
}


Expand All @@ -246,6 +287,7 @@ public void deleteEdge( EdgeModel model ) {
return;
}
edgeList.removeIf( e -> e.isEquivalent( model ) );
// TODO: reset target activity and all successors, update previews
}


Expand Down Expand Up @@ -341,8 +383,7 @@ public void validateStructure( StorageManager sm, AttributedDirectedGraph<UUID,
}
}

// compatible settings
// TODO: verify succesors of idle nodes are idle as well
// compatible settings ?

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,13 @@ public class ActivityWrapper {
@Setter
private ActivityState state = ActivityState.IDLE;

private final VariableStore variables; // depending on state, this either represents the variables before (possibly not yet stable) or after execution (always stable)
private final VariableStore variables = new VariableStore(); // depending on state, this either represents the variables before (possibly not yet stable) or after execution (always stable)
@Setter
private List<Optional<AlgDataType>> outTypePreview; // TODO: ensure this is always up to date
@Setter
private List<Optional<AlgDataType>> inTypePreview; // contains the (possibly not yet known) input type
@Setter
private SettingsPreview settingsPreview; // contains the (possibly not yet known) settings


protected ActivityWrapper( UUID id, Activity activity, String type, Map<String, JsonNode> settings, ActivityConfigModel config, RenderModel rendering ) {
Expand All @@ -66,8 +70,6 @@ protected ActivityWrapper( UUID id, Activity activity, String type, Map<String,
this.serializableSettings = settings;
this.config = config;
this.rendering = rendering;

this.variables = new VariableStore();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,26 @@
import java.util.List;
import java.util.Optional;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.catalog.exceptions.GenericRuntimeException;
import org.polypheny.db.workflow.dag.settings.SettingDef.Settings;
import org.polypheny.db.workflow.dag.settings.SettingDef.SettingsPreview;
import org.polypheny.db.workflow.dag.variables.WritableVariableStore;
import org.polypheny.db.workflow.engine.execution.context.ExecutionContext;
import org.polypheny.db.workflow.engine.execution.context.ExecutionContextImpl;
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;

public interface VariableWriter extends Activity {

@Override
default void execute( List<CheckpointReader> inputs, Settings settings, ExecutionContext ctx ) throws Exception {
assert requestsToWrite(
inputs.stream().map( r -> Optional.of( r.getTupleType() ) ).toList(),
SettingsPreview.of( settings )
).orElseThrow() : "Cannot use the default execute implementation of VariableWriter if requestsToWrite returns false.";

throw new GenericRuntimeException( "The standard execute method cannot be called for a VariableWriter that requests to write" );
}

/**
* Whether the activity wants to be able to write variables during execution.
* Activities that implement this interface are typically pure writers, the default return value is thus {@code Optional.of(true)}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2019-2024 The Polypheny Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.polypheny.db.workflow.dag.activities.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory;
import org.polypheny.db.workflow.dag.activities.Activity.PortType;
import org.polypheny.db.workflow.dag.activities.ActivityException;
import org.polypheny.db.workflow.dag.activities.VariableWriter;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.InPort;
import org.polypheny.db.workflow.dag.annotations.IntSetting;
import org.polypheny.db.workflow.dag.annotations.StringSetting;
import org.polypheny.db.workflow.dag.settings.ListValue;
import org.polypheny.db.workflow.dag.settings.SettingDef.Settings;
import org.polypheny.db.workflow.dag.settings.SettingDef.SettingsPreview;
import org.polypheny.db.workflow.dag.settings.StringValue;
import org.polypheny.db.workflow.dag.variables.WritableVariableStore;
import org.polypheny.db.workflow.engine.execution.context.ExecutionContextImpl;
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;

@ActivityDefinition(type = "fieldNameToVar", displayName = "Extract Field Names", categories = { ActivityCategory.VARIABLES, ActivityCategory.RELATIONAL },
inPorts = { @InPort(type = PortType.REL) }, // TODO: add support for docs
outPorts = {}
)
@StringSetting(key = "variableName", displayName = "Variable Name", defaultValue = "field_names", minLength = 1, maxLength = 128)
@IntSetting(key = "fields", displayName = "Target Fields", isList = true, defaultValue = 0, min = 0)
public class FieldNameToVariableActivity implements VariableWriter {

@Override
public List<Optional<AlgDataType>> previewOutTypes( List<Optional<AlgDataType>> inTypes, SettingsPreview settings ) throws ActivityException {
return List.of();
}


@Override
public void reset() {

}


@Override
public void execute( List<CheckpointReader> inputs, Settings settings, ExecutionContextImpl ctx, WritableVariableStore writer ) {
String variableName = settings.get( "variableName", StringValue.class ).getValue();
List<Integer> targetFields = settings.get( "fields", ListValue.class ).getValues();

List<String> fieldNames = inputs.get( 0 ).getTupleType().getFieldNames();
List<String> values;
if ( targetFields.isEmpty() ) {
values = fieldNames;
} else {
values = new ArrayList<>();
for ( int i : targetFields ) {
values.add( fieldNames.get( i ) );
}
}
writer.setVariable( variableName, values );
}

}
Loading

0 comments on commit 3224ca0

Please sign in to comment.