From c88a0776a33beedf4fe94bc4a1f809d9f3795e36 Mon Sep 17 00:00:00 2001 From: Tobias Weber Date: Fri, 6 Dec 2024 18:52:03 +0100 Subject: [PATCH] Get basic scheduler functionality running --- .../polypheny/db/workflow/dag/Workflow.java | 37 ++- .../db/workflow/dag/WorkflowImpl.java | 136 +++++++++ .../db/workflow/dag/activities/Activity.java | 17 +- .../workflow/dag/activities/ActivityDef.java | 14 + .../dag/activities/ActivityWrapper.java | 90 +++++- .../dag/activities/impl/DebugActivity.java | 71 +++++ .../dag/activities/impl/RelUnionActivity.java | 76 +++++ .../activities/impl/RelValuesActivity.java | 91 ++++++ .../dag/annotations/ActivityDefinition.java | 2 +- .../db/workflow/dag/edges/DataEdge.java | 16 ++ .../polypheny/db/workflow/dag/edges/Edge.java | 5 + .../dag/variables/ReadableVariableStore.java | 3 + .../workflow/dag/variables/VariableStore.java | 105 ++++++- .../dag/variables/WritableVariableStore.java | 10 + .../engine/execution/DefaultExecutor.java | 12 +- .../workflow/engine/execution/Executor.java | 39 +-- .../engine/execution/FusionExecutor.java | 27 +- .../engine/execution/PipeExecutor.java | 29 +- .../execution/VariableWriterExecutor.java | 11 +- .../context/ExecutionContextImpl.java | 2 +- .../engine/scheduler/ExecutionEdge.java | 13 + .../engine/scheduler/ExecutionResult.java | 5 + .../engine/scheduler/ExecutionSubmission.java | 5 +- .../engine/scheduler/GlobalScheduler.java | 12 +- .../workflow/engine/scheduler/GraphUtils.java | 28 ++ .../engine/scheduler/WorkflowScheduler.java | 272 ++++++++++++------ .../optimizer/WorkflowOptimizer.java | 69 ++--- .../optimizer/WorkflowOptimizerImpl.java | 19 +- .../engine/storage/StorageManager.java | 14 +- .../engine/storage/StorageManagerImpl.java | 30 +- .../storage/reader/CheckpointReader.java | 2 +- .../workflow/models/ActivityConfigModel.java | 6 +- .../db/workflow/models/ActivityModel.java | 23 ++ .../db/workflow/models/EdgeModel.java | 16 ++ .../polypheny/db/workflow/WorkflowUtils.java | 73 +++++ .../engine/scheduler/GlobalSchedulerTest.java | 90 ++++++ .../scheduler/WorkflowSchedulerTest.java | 106 +++++++ .../engine/storage/StorageManagerTest.java | 56 +--- .../workflow/engine/storage/StorageUtils.java | 83 ++++++ 39 files changed, 1412 insertions(+), 303 deletions(-) create mode 100644 plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/DebugActivity.java create mode 100644 plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelUnionActivity.java create mode 100644 plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelValuesActivity.java create mode 100644 plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/WorkflowUtils.java create mode 100644 plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java create mode 100644 plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/WorkflowSchedulerTest.java create mode 100644 plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageUtils.java diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java index 0132961a94..b1d8a690a5 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java @@ -17,11 +17,14 @@ package org.polypheny.db.workflow.dag; import java.util.List; +import java.util.Optional; import java.util.UUID; +import org.polypheny.db.algebra.type.AlgDataType; import org.polypheny.db.util.graph.AttributedDirectedGraph; import org.polypheny.db.workflow.dag.activities.ActivityWrapper; import org.polypheny.db.workflow.dag.edges.DataEdge; import org.polypheny.db.workflow.dag.edges.Edge; +import org.polypheny.db.workflow.dag.variables.ReadableVariableStore; import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge; import org.polypheny.db.workflow.models.EdgeModel; import org.polypheny.db.workflow.models.WorkflowConfigModel; @@ -67,6 +70,8 @@ public interface Workflow { Edge getEdge( EdgeModel model ); + Edge getEdge( ExecutionEdge execEdge ); + DataEdge getDataEdge( UUID to, int toPort ); WorkflowConfigModel getConfig(); @@ -83,6 +88,31 @@ public interface Workflow { */ void setState( WorkflowState state ); + /** + * Recomputes the variables of the specified activity based on the variables stored in its input activities + * and the edge state. + * The resulting variables might not yet be stable. + * + * @param activityId the activity whose variables will be recomputed + * @return a readable view of the recomputed variable store + */ + ReadableVariableStore recomputeInVariables( UUID activityId ); + + /** + * Returns true if all edges that could change the variables of the specified activity are + * not IDLE (except for ignored edges). + * If the specified activity is not yet executed, calling {@code recomputeInVariables()} more than once + * does not change its variables. + * + * @param activityId the activity whose variables will be recomputed + * @return true if the variables of the activities are stable. + */ + boolean hasStableInVariables( UUID activityId ); + + List> getInputTypes( UUID activityId ); + + int getInPortCount( UUID activityId ); + void addActivity( ActivityWrapper activity ); void deleteActivity( UUID activityId ); @@ -91,6 +121,10 @@ public interface Workflow { AttributedDirectedGraph toDag(); + void validateStructure() throws Exception; + + void validateStructure( AttributedDirectedGraph subDag ) throws IllegalStateException; + /** * Returns a WorkflowModel of this workflow. @@ -110,7 +144,8 @@ default WorkflowModel toModel( boolean includeState ) { enum WorkflowState { IDLE, - EXECUTING + EXECUTING, + INTERRUPTED } } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java index 2e8f8c025d..5c13b0ab0a 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java @@ -18,20 +18,29 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import lombok.Getter; import lombok.Setter; +import org.polypheny.db.algebra.type.AlgDataType; import org.polypheny.db.catalog.exceptions.GenericRuntimeException; import org.polypheny.db.util.Pair; import org.polypheny.db.util.graph.AttributedDirectedGraph; +import org.polypheny.db.util.graph.CycleDetector; +import org.polypheny.db.util.graph.TopologicalOrderIterator; import org.polypheny.db.workflow.dag.activities.ActivityWrapper; import org.polypheny.db.workflow.dag.edges.DataEdge; import org.polypheny.db.workflow.dag.edges.Edge; +import org.polypheny.db.workflow.dag.edges.Edge.EdgeState; +import org.polypheny.db.workflow.dag.variables.ReadableVariableStore; import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge; import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge.ExecutionEdgeFactory; +import org.polypheny.db.workflow.models.ActivityConfigModel.CommonType; import org.polypheny.db.workflow.models.ActivityModel; import org.polypheny.db.workflow.models.EdgeModel; import org.polypheny.db.workflow.models.WorkflowConfigModel; @@ -114,12 +123,14 @@ public List getEdges( ActivityWrapper from, ActivityWrapper to ) { @Override public List getInEdges( UUID target ) { + // TODO: make more efficient return getEdges().stream().filter( e -> e.getTo().getId().equals( target ) ).toList(); } @Override public List getOutEdges( UUID source ) { + // TODO: make more efficient return getEdges().stream().filter( e -> e.getFrom().getId().equals( source ) ).toList(); } @@ -136,6 +147,20 @@ public Edge getEdge( EdgeModel model ) { } + @Override + public Edge getEdge( ExecutionEdge execEdge ) { + List candidates = edges.get( Pair.of( execEdge.getSource(), execEdge.getTarget() ) ); + if ( candidates != null ) { + for ( Edge candidate : candidates ) { + if ( execEdge.representsEdge( candidate ) ) { + return candidate; + } + } + } + return null; + } + + @Override public DataEdge getDataEdge( UUID to, int toPort ) { for ( Edge edge : getInEdges( to ) ) { @@ -155,6 +180,43 @@ public WorkflowConfigModel getConfig() { } + @Override + public ReadableVariableStore recomputeInVariables( UUID activityId ) { + ActivityWrapper wrapper = activities.get( activityId ); + wrapper.getVariables().mergeInputStores( getInEdges( activityId ), wrapper.getDef().getInPorts().length ); + return wrapper.getVariables(); + } + + + @Override + public boolean hasStableInVariables( UUID activityId ) { + for ( Edge edge : getInEdges( activityId ) ) { + if ( edge.getState() == EdgeState.IDLE && !edge.isIgnored() ) { + return false; + } + } + return true; + } + + + @Override + public List> getInputTypes( UUID activityId ) { + List> inputTypes = new ArrayList<>(); + + for ( int i = 0; i < getInPortCount( activityId ); i++ ) { + DataEdge dataEdge = getDataEdge( activityId, i ); + inputTypes.add( dataEdge.getFrom().getOutTypePreview().get( dataEdge.getFromPort() ) ); + } + return inputTypes; + } + + + @Override + public int getInPortCount( UUID activityId ) { + return activities.get( activityId ).getDef().getInPorts().length; + } + + @Override public void addActivity( ActivityWrapper activity ) { if ( activities.containsKey( activity.getId() ) ) { @@ -191,4 +253,78 @@ public AttributedDirectedGraph toDag() { return dag; } + + @Override + public void validateStructure() throws Exception { + validateStructure( toDag() ); + } + + + @Override + public void validateStructure( AttributedDirectedGraph subDag ) throws IllegalStateException { + if ( subDag.vertexSet().isEmpty() && subDag.edgeSet().isEmpty() ) { + return; + } + + for ( ExecutionEdge execEdge : subDag.edgeSet() ) { + if ( !activities.containsKey( execEdge.getSource() ) || !activities.containsKey( execEdge.getTarget() ) ) { + throw new IllegalStateException( "Source and target activities of an edge must be part of the workflow: " + execEdge ); + } + Edge edge = getEdge( execEdge ); + if ( edge instanceof DataEdge data && !data.isCompatible() ) { + throw new IllegalStateException( "Incompatible port types for data edge: " + edge ); + } + } + + if ( !(new CycleDetector<>( subDag ).findCycles().isEmpty()) ) { + throw new IllegalStateException( "A workflow must not contain cycles" ); + } + + for ( UUID n : TopologicalOrderIterator.of( subDag ) ) { + ActivityWrapper wrapper = getActivity( n ); + CommonType type = wrapper.getConfig().getCommonType(); + Set requiredInPorts = wrapper.getDef().getRequiredInPorts(); + Set occupiedInPorts = new HashSet<>(); + for ( ExecutionEdge execEdge : subDag.getInwardEdges( n ) ) { + ActivityWrapper source = getActivity( execEdge.getSource() ); + CommonType sourceType = source.getConfig().getCommonType(); + int toPort = execEdge.getToPort(); + + requiredInPorts.remove( toPort ); + + if ( occupiedInPorts.contains( toPort ) ) { + throw new IllegalStateException( "InPort " + toPort + " is already occupied: " + execEdge ); + } + occupiedInPorts.add( toPort ); + + if ( wrapper.getState().isExecuted() && !source.getState().isExecuted() ) { + throw new IllegalStateException( "An activity that is executed cannot have a not yet executed predecessor: " + execEdge ); + } + if ( type == CommonType.EXTRACT ) { + if ( sourceType != CommonType.EXTRACT ) { + throw new IllegalStateException( "An activity with CommonType EXTRACT must only have EXTRACT predecessors: " + execEdge ); + } + if ( execEdge.isControl() && !execEdge.isOnSuccess() ) { + throw new IllegalStateException( "Cannot have a onFail control edge between common EXTRACT activities" + execEdge ); + } + } else if ( sourceType == CommonType.LOAD ) { + if ( type != CommonType.LOAD ) { + throw new IllegalStateException( "An activity with CommonType LOAD must only have LOAD successors: " + execEdge ); + } + if ( execEdge.isControl() && !execEdge.isOnSuccess() ) { + throw new IllegalStateException( "Cannot have a onFail control edge between common LOAD activities" + execEdge ); + } + } + + } + if ( !requiredInPorts.isEmpty() ) { + throw new IllegalStateException( "Activity is missing the required data input(s) " + requiredInPorts + ": " + wrapper ); + } + } + + // compatible settings + // TODO: verify succesors of idle nodes are idle as well + + } + } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/Activity.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/Activity.java index 396ccfb51b..7b72083482 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/Activity.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/Activity.java @@ -27,7 +27,6 @@ import org.polypheny.db.catalog.logistic.DataModel; import org.polypheny.db.workflow.dag.edges.Edge.EdgeState; import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue; -import org.polypheny.db.workflow.dag.variables.WritableVariableStore; import org.polypheny.db.workflow.engine.execution.context.ExecutionContext; import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader; @@ -40,6 +39,7 @@ public interface Activity { * If any available setting or input type results in a contradiction or invalid state, * an {@link ActivityException} is thrown. * If a setting, input or output type is available, it is guaranteed to not change anymore. + * This method should be idempotent. * * @param inTypes a list of {@link Optional} representing the input tuple types. * @param settings a map of setting keys to {@link Optional} representing the available settings, i.e. all settings that do not contain variables. @@ -54,19 +54,6 @@ static List> wrapType( @Nullable AlgDataType type ) { } - /** - * This method is called just before execution starts and can be used to write variables based on input tuple types and settings. - * To be able to update variables while having access to the input data, the activity should instead implement {@link VariableWriter}. - * - * @param inTypes a list of {@link AlgDataType} representing the input tuple types. - * @param settings a map of setting keys to {@link SettingValue} representing the settings. - * @param variables a WritableVariableStore to be used for updating any variable values. - */ - default void updateVariables( List inTypes, Map settings, WritableVariableStore variables ) { - } - - // settings do NOT include values from the updateVariables step. - /** * Execute this activity. * Any input CheckpointReaders are provided and expected to be closed by the caller. @@ -152,7 +139,7 @@ enum DataStateMerger { /** * Computes whether an activity is NOT aborted based on its data edge states. * - * @param dataEdges the EdgeState of all data inputs of an activity + * @param dataEdges the EdgeState of all data inputs of an activity in port order, null in case no edge is connected * @return false if the data edge states result in an abort, true otherwise. */ public boolean merge( List dataEdges ) { diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/ActivityDef.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/ActivityDef.java index a65ff1c5a7..3eed071781 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/ActivityDef.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/ActivityDef.java @@ -19,9 +19,11 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import java.lang.annotation.Annotation; import java.util.Arrays; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import lombok.Value; import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory; import org.polypheny.db.workflow.dag.activities.Activity.PortType; @@ -83,6 +85,18 @@ public PortType[] getOutPortTypes() { } + @JsonIgnore + public Set getRequiredInPorts() { + Set set = new HashSet<>(); + for ( int i = 0; i < inPorts.length; i++ ) { + if ( !inPorts[i].isOptional ) { + set.add( i ); + } + } + return set; + } + + public boolean hasCategory( ActivityCategory category ) { if ( category == null ) { return false; diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/ActivityWrapper.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/ActivityWrapper.java index b0b214a64e..1307221204 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/ActivityWrapper.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/ActivityWrapper.java @@ -17,12 +17,19 @@ package org.polypheny.db.workflow.dag.activities; import com.fasterxml.jackson.databind.JsonNode; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import lombok.Getter; import lombok.Setter; +import org.polypheny.db.algebra.type.AlgDataType; import org.polypheny.db.workflow.dag.activities.Activity.ControlStateMerger; +import org.polypheny.db.workflow.dag.edges.ControlEdge; +import org.polypheny.db.workflow.dag.edges.DataEdge; +import org.polypheny.db.workflow.dag.edges.Edge; +import org.polypheny.db.workflow.dag.edges.Edge.EdgeState; import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue; import org.polypheny.db.workflow.dag.variables.VariableStore; import org.polypheny.db.workflow.models.ActivityConfigModel; @@ -41,15 +48,13 @@ public class ActivityWrapper { private ActivityConfigModel config; @Setter private RenderModel rendering; - private final VariableStore variables; // depending on state, this either represents the variables before or after execution - - /** - * After initialization, the state should never be changed by the wrapper itself. - * The state is typically changed by the scheduler. - */ @Setter private ActivityState state = ActivityState.IDLE; + private final VariableStore variables; // depending on state, this either represents the variables before (possibly not yet stable) or after execution (always stable) + @Setter + private List> outTypePreview; // TODO: ensure this is always up to date + protected ActivityWrapper( UUID id, Activity activity, String type, Map settings, ActivityConfigModel config, RenderModel rendering ) { this.activity = activity; @@ -72,9 +77,19 @@ public Map resolveSettings() { return ActivityRegistry.buildSettingValues( type, variables.resolveVariables( serializableSettings ) ); } + // TODO: be careful to use correct variables (must be sure they are correct) + + + public Map> resolveAvailableSettings( boolean hasStableVariables ) { + VariableStore store = hasStableVariables ? variables : new VariableStore(); + return ActivityRegistry.buildAvailableSettingValues( type, store.resolveAvailableVariables( serializableSettings ) ); + } + - public Map> resolveAvailableSettings() { - return ActivityRegistry.buildAvailableSettingValues( type, variables.resolveAvailableVariables( serializableSettings ) ); + public Map> updateOutTypePreview( List> inTypePreviews, boolean hasStableVariables ) throws ActivityException { + Map> settings = resolveAvailableSettings( hasStableVariables ); + outTypePreview = activity.previewOutTypes( inTypePreviews, settings ); + return settings; } @@ -98,19 +113,64 @@ public ControlStateMerger getControlStateMerger() { } + public EdgeState canExecute( List inEdges ) { + EdgeState[] dataEdges = new EdgeState[getDef().getInPorts().length]; + List successEdges = new ArrayList<>(); + List failEdges = new ArrayList<>(); + for ( Edge edge : inEdges ) { + if ( edge instanceof DataEdge data ) { + dataEdges[data.getToPort()] = data.getState(); + } else if ( edge instanceof ControlEdge control ) { + List list = control.isOnSuccess() ? successEdges : failEdges; + list.add( control.getState() ); + } + } + + if ( !activity.getDataStateMerger().merge( List.of( dataEdges ) ) ) { + return EdgeState.INACTIVE; + } + return getControlStateMerger().merge( successEdges, failEdges ); + } + + + public void resetExecution() { + activity.reset(); + variables.clear(); + state = ActivityState.IDLE; + } + + public static ActivityWrapper fromModel( ActivityModel model ) { return new ActivityWrapper( model.getId(), ActivityRegistry.activityFromType( model.getType() ), model.getType(), model.getSettings(), model.getConfig(), model.getRendering() ); } + @Override + public String toString() { + return "ActivityWrapper{" + + "type='" + type + '\'' + + ", id=" + id + + ", state=" + state + + '}'; + } + + public enum ActivityState { - IDLE, - QUEUED, - EXECUTING, - SKIPPED, // => execution was aborted - FAILED, - FINISHED, - SAVED // => finished + checkpoint created + IDLE( false ), + QUEUED( false ), + EXECUTING( false ), + SKIPPED( false ), // => execution was aborted + FAILED( true ), + FINISHED( true ), + SAVED( true ); // => finished + checkpoint created + + @Getter + private final boolean isExecuted; + + + ActivityState( boolean isExecuted ) { + this.isExecuted = isExecuted; + } } } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/DebugActivity.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/DebugActivity.java new file mode 100644 index 0000000000..a60667f24b --- /dev/null +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/DebugActivity.java @@ -0,0 +1,71 @@ +/* + * Copyright 2019-2024 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.workflow.dag.activities.impl; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.polypheny.db.algebra.type.AlgDataType; +import org.polypheny.db.workflow.dag.activities.Activity; +import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory; +import org.polypheny.db.workflow.dag.activities.Activity.PortType; +import org.polypheny.db.workflow.dag.activities.ActivityException; +import org.polypheny.db.workflow.dag.annotations.ActivityDefinition; +import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.InPort; +import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.OutPort; +import org.polypheny.db.workflow.dag.annotations.IntSetting; +import org.polypheny.db.workflow.dag.settings.IntValue; +import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue; +import org.polypheny.db.workflow.engine.execution.context.ExecutionContext; +import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader; +import org.polypheny.db.workflow.engine.storage.reader.RelReader; +import org.polypheny.db.workflow.engine.storage.writer.RelWriter; + +@ActivityDefinition(type = "debug", displayName = "Debugging", categories = { ActivityCategory.TRANSFORM }, + inPorts = { @InPort(type = PortType.REL) }, + outPorts = { @OutPort(type = PortType.REL) } +) + +@IntSetting(key = "delay", displayName = "Delay (ms)", defaultValue = 1000) +public class DebugActivity implements Activity { + + + public DebugActivity() { + } + + + @Override + public List> previewOutTypes( List> inTypes, Map> settings ) throws ActivityException { + return List.of( inTypes.get( 0 ) ); + } + + + @Override + public void execute( List inputs, Map settings, ExecutionContext ctx ) throws Exception { + RelReader input = (RelReader) inputs.get( 0 ); + try ( RelWriter output = ctx.createRelWriter( 0, input.getTupleType(), false ) ) { + Thread.sleep( settings.get( "delay" ).unwrapOrThrow( IntValue.class ).getValue() ); + output.write( input.getIterator() ); + } + } + + + @Override + public void reset() { + } + +} diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelUnionActivity.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelUnionActivity.java new file mode 100644 index 0000000000..5266a05df9 --- /dev/null +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelUnionActivity.java @@ -0,0 +1,76 @@ +/* + * Copyright 2019-2024 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.workflow.dag.activities.impl; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.polypheny.db.algebra.type.AlgDataType; +import org.polypheny.db.workflow.dag.activities.Activity; +import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory; +import org.polypheny.db.workflow.dag.activities.Activity.PortType; +import org.polypheny.db.workflow.dag.activities.ActivityException; +import org.polypheny.db.workflow.dag.activities.ActivityException.InvalidInputException; +import org.polypheny.db.workflow.dag.annotations.ActivityDefinition; +import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.InPort; +import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.OutPort; +import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue; +import org.polypheny.db.workflow.engine.execution.context.ExecutionContext; +import org.polypheny.db.workflow.engine.storage.reader.CheckpointQuery; +import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader; +import org.polypheny.db.workflow.engine.storage.writer.CheckpointWriter; + +@ActivityDefinition(type = "relUnion", displayName = "Relational Union", categories = { ActivityCategory.TRANSFORM }, + inPorts = { @InPort(type = PortType.REL), @InPort(type = PortType.REL) }, + outPorts = { @OutPort(type = PortType.REL) } +) +public class RelUnionActivity implements Activity { + + @Override + public List> previewOutTypes( List> inTypes, Map> settings ) throws ActivityException { + + Optional firstType = inTypes.get( 0 ); + Optional secondType = inTypes.get( 1 ); + if ( firstType.isEmpty() || secondType.isEmpty() ) { + return List.of( Optional.empty() ); + } + if ( !firstType.get().equals( secondType.get() ) ) { + throw new InvalidInputException( "The second input type is not equal to the first input type", 1 ); + } + return List.of( firstType ); + } + + + @Override + public void execute( List inputs, Map settings, ExecutionContext ctx ) throws Exception { + CheckpointQuery query = CheckpointQuery.builder() + .queryLanguage( "SQL" ) + .query( "SELECT * FROM " + CheckpointQuery.ENTITY( 0 ) + " UNION ALL SELECT * FROM " + CheckpointQuery.ENTITY( 1 ) ) + .build(); + try ( CheckpointWriter writer = ctx.createRelWriter( 0, inputs.get( 0 ).getTupleType(), true ) ) { + writer.write( inputs.get( 0 ).getIteratorFromQuery( query, inputs ) ); + } + + } + + + @Override + public void reset() { + + } + +} diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelValuesActivity.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelValuesActivity.java new file mode 100644 index 0000000000..d35503876d --- /dev/null +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelValuesActivity.java @@ -0,0 +1,91 @@ +/* + * Copyright 2019-2024 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.workflow.dag.activities.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.polypheny.db.algebra.type.AlgDataType; +import org.polypheny.db.algebra.type.AlgDataTypeFactory; +import org.polypheny.db.type.PolyType; +import org.polypheny.db.type.entity.PolyString; +import org.polypheny.db.type.entity.PolyValue; +import org.polypheny.db.type.entity.numerical.PolyInteger; +import org.polypheny.db.workflow.dag.activities.Activity; +import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory; +import org.polypheny.db.workflow.dag.activities.Activity.PortType; +import org.polypheny.db.workflow.dag.activities.ActivityException; +import org.polypheny.db.workflow.dag.annotations.ActivityDefinition; +import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.OutPort; +import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue; +import org.polypheny.db.workflow.engine.execution.context.ExecutionContext; +import org.polypheny.db.workflow.engine.storage.StorageManager; +import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader; +import org.polypheny.db.workflow.engine.storage.writer.RelWriter; + +@ActivityDefinition(type = "relValues", displayName = "Constant Table", categories = { ActivityCategory.EXTRACT }, + inPorts = {}, + outPorts = { @OutPort(type = PortType.REL) } +) +public class RelValuesActivity implements Activity { + + @Override + public List> previewOutTypes( List> inTypes, Map> settings ) throws ActivityException { + return Activity.wrapType( getType() ); + } + + + @Override + public void execute( List inputs, Map settings, ExecutionContext ctx ) throws Exception { + try ( RelWriter writer = ctx.createRelWriter( 0, getType(), true ) ) { + writer.write( getValues().iterator() ); + } + } + + + @Override + public void reset() { + + } + + + private static AlgDataType getType() { + AlgDataTypeFactory typeFactory = AlgDataTypeFactory.DEFAULT; + return typeFactory.builder() + .add( null, StorageManager.PK_COL, null, typeFactory.createPolyType( PolyType.BIGINT ) ) + .add( null, "name", null, typeFactory.createPolyType( PolyType.VARCHAR, 50 ) ) + .add( null, "age", null, typeFactory.createPolyType( PolyType.INTEGER ) ) + .add( null, "gender", null, typeFactory.createPolyType( PolyType.VARCHAR, 1 ) ) + .build(); + } + + + private static List> getValues() { + List> tuples = new ArrayList<>(); + tuples.add( getRow( "Alice", 25, true ) ); + tuples.add( getRow( "Bob", 30, false ) ); + tuples.add( getRow( "Charlie", 35, false ) ); + return tuples; + } + + + private static List getRow( String name, int age, boolean isFemale ) { + return List.of( PolyInteger.of( 0 ), PolyString.of( name ), PolyInteger.of( age ), PolyString.of( isFemale ? "F" : "M" ) ); + } + +} diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/annotations/ActivityDefinition.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/annotations/ActivityDefinition.java index 92dba65c3a..76c4aeb9cf 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/annotations/ActivityDefinition.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/annotations/ActivityDefinition.java @@ -47,7 +47,7 @@ String description() default ""; - boolean isOptional() default false; + boolean isOptional() default false; // TODO: isOptional currently has no effect } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/DataEdge.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/DataEdge.java index da86de8ffb..f959661cce 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/DataEdge.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/DataEdge.java @@ -17,6 +17,7 @@ package org.polypheny.db.workflow.dag.edges; import lombok.Getter; +import org.polypheny.db.workflow.dag.activities.Activity.PortType; import org.polypheny.db.workflow.dag.activities.ActivityWrapper; import org.polypheny.db.workflow.models.EdgeModel; @@ -45,4 +46,19 @@ public boolean isEquivalent( EdgeModel model ) { return hasSameEndpoints( model ) && !model.isControl() && model.getFromPort() == fromPort && model.getToPort() == toPort; } + + public PortType getFromPortType() { + return from.getDef().getOutPortTypes()[fromPort]; + } + + + public PortType getToPortType() { + return to.getDef().getInPortTypes()[toPort]; + } + + + public boolean isCompatible() { + return getToPortType().canReadFrom( getFromPortType() ); + } + } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/Edge.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/Edge.java index a5ac47f42d..ef61365a01 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/Edge.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/Edge.java @@ -79,6 +79,11 @@ public boolean isActive() { } + public boolean isIgnored() { + return false; + } + + public enum EdgeState { IDLE, ACTIVE, diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/ReadableVariableStore.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/ReadableVariableStore.java index 52b83db0c4..625152b48d 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/ReadableVariableStore.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/ReadableVariableStore.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.Map; import java.util.Optional; @@ -29,6 +30,8 @@ public interface ReadableVariableStore { JsonNode getVariable( String key ); + ObjectNode getError(); + /** * Get an unmodifiable snapshot of the underlying variables map. * diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/VariableStore.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/VariableStore.java index 2c5335bee3..f6e46b1166 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/VariableStore.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/VariableStore.java @@ -22,13 +22,26 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.Set; +import org.polypheny.db.workflow.dag.edges.ControlEdge; +import org.polypheny.db.workflow.dag.edges.DataEdge; +import org.polypheny.db.workflow.dag.edges.Edge; +import org.polypheny.db.workflow.dag.edges.Edge.EdgeState; import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue; public class VariableStore implements ReadableVariableStore, WritableVariableStore { + + public static final String ERROR_MSG_KEY = "$errorMsg"; + public static final Set RESERVED_KEYS = new HashSet<>( Set.of( + ERROR_MSG_KEY + ) ); + private static final JsonMapper mapper = new JsonMapper(); private final Map variables = new HashMap<>(); @@ -46,20 +59,30 @@ public WritableVariableStore asWritable() { @Override public void setVariable( String key, JsonNode value ) { + failIfReservedKey( key ); if ( containsVariableRef( value ) ) { throw new IllegalArgumentException( "Setting a variable containing a variable reference is not allowed" ); } variables.put( key, value ); } + @Override public void setVariable( String key, SettingValue value ) { + failIfReservedKey( key ); variables.put( key, value.toJson( mapper ) ); } + @Override + public void setError( ObjectNode value ) { + variables.put( ERROR_MSG_KEY, value ); + } + + @Override public void merge( ReadableVariableStore newStore ) { + // does NOT fail with reserved keywords variables.putAll( newStore.getVariables() ); } @@ -70,6 +93,13 @@ public void clear() { } + @Override + public void reset( ReadableVariableStore newStore ) { + clear(); + merge( newStore ); + } + + @Override public boolean contains( String key ) { return variables.containsKey( key ); @@ -82,12 +112,25 @@ public JsonNode getVariable( String key ) { } + @Override + public ObjectNode getError() { + return (ObjectNode) getVariable( ERROR_MSG_KEY ); + } + + @Override public Map getVariables() { return Map.copyOf( variables ); } + private void failIfReservedKey( String key ) { + if ( RESERVED_KEYS.contains( key ) ) { + throw new IllegalArgumentException( "Cannot use reserved key: " + key ); + } + } + + // TODO: make sure to not change the existing JsonNode public JsonNode resolveVariables( JsonNode node ) { if ( node.isObject() ) { @@ -135,14 +178,66 @@ public Map resolveVariables( Map nodes ) { @Override public Map> resolveAvailableVariables( Map nodes ) { Map> resolved = new HashMap<>(); - for (Map.Entry entry : nodes.entrySet()) { + for ( Map.Entry entry : nodes.entrySet() ) { try { - resolved.put(entry.getKey(), Optional.of(resolveVariables(entry.getValue()))); - } catch (IllegalArgumentException e) { - resolved.put(entry.getKey(), Optional.empty()); + resolved.put( entry.getKey(), Optional.of( resolveVariables( entry.getValue() ) ) ); + } catch ( IllegalArgumentException e ) { + resolved.put( entry.getKey(), Optional.empty() ); + } + } + return Collections.unmodifiableMap( resolved ); + } + + + /** + * Clears this store, then merges all incoming variableStores if their corresponding edge is not inactive and not ignored. + * Be careful, the resulting variables might not be consistent at a later point, as input stores might change. + *

+ * The merge order is the following, with later stores able to overwrite variables written before: + * 1. Stores from data inputs, ordered by their inPort + * 2. Stores from success control edges (in arbitrary order) + * 3. Stores from fail control edges (in arbitrary order) + * + * @param inEdges All input edges (data and control) to the activity + */ + public void mergeInputStores( List inEdges, int inPortCount ) { + ReadableVariableStore[] dataToMerge = new ReadableVariableStore[inPortCount]; + Set successToMerge = new HashSet<>(); + Set failToMerge = new HashSet<>(); + + for ( Edge edge : inEdges ) { + if ( edge.getState() == EdgeState.INACTIVE ) { + continue; + } + if ( edge instanceof DataEdge data ) { + dataToMerge[data.getToPort()] = data.getFrom().getVariables(); + } else if ( edge instanceof ControlEdge control ) { + if ( control.isIgnored() ) { + continue; + } + ReadableVariableStore variables = control.getFrom().getVariables(); + if ( variables == null ) { + continue; + } + + if ( control.isOnSuccess() ) { + successToMerge.add( variables ); + } else { + failToMerge.add( variables ); + } + } else { + throw new IllegalArgumentException( "Unexpected Edge type" ); + } + } + + this.clear(); + for ( ReadableVariableStore readableVariableStore : dataToMerge ) { + if ( readableVariableStore != null ) { + this.merge( readableVariableStore ); } } - return Collections.unmodifiableMap(resolved); + successToMerge.forEach( this::merge ); + failToMerge.forEach( this::merge ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/WritableVariableStore.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/WritableVariableStore.java index 49e60184f6..414ee38a11 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/WritableVariableStore.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/WritableVariableStore.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue; public interface WritableVariableStore { @@ -40,6 +41,8 @@ public interface WritableVariableStore { */ void setVariable( String key, SettingValue value ); + void setError( ObjectNode value ); + /** * Merge this variableStore with the specified store. * In the case of duplicates, newStore takes priority. @@ -50,4 +53,11 @@ public interface WritableVariableStore { void clear(); + /** + * Clears the store and then sets it to newStore + * + * @param newStore the store whose values are used to populate this store after clearing it + */ + void reset( ReadableVariableStore newStore ); + } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/DefaultExecutor.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/DefaultExecutor.java index 76498bb3dd..55f6c866bc 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/DefaultExecutor.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/DefaultExecutor.java @@ -17,12 +17,9 @@ package org.polypheny.db.workflow.engine.execution; import java.util.List; -import java.util.Map; import java.util.UUID; -import org.polypheny.db.algebra.type.AlgDataType; import org.polypheny.db.workflow.dag.Workflow; import org.polypheny.db.workflow.dag.activities.ActivityWrapper; -import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue; import org.polypheny.db.workflow.engine.execution.context.ExecutionContextImpl; import org.polypheny.db.workflow.engine.storage.StorageManager; import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader; @@ -46,15 +43,12 @@ public DefaultExecutor( StorageManager sm, Workflow wf, UUID activityId ) { @Override void execute() throws ExecutorException { List inputs = getReaders( wrapper ); - List inputTypes = inputs.stream().map( CheckpointReader::getTupleType ).toList(); - - mergeInputVariables( wrapper.getId() ); - Map settings = wrapper.resolveSettings(); // settings before variable update ctx = new ExecutionContextImpl( wrapper, sm ); try ( CloseableList ignored = new CloseableList( inputs ) ) { - wrapper.getActivity().updateVariables( inputTypes, settings, wrapper.getVariables() ); - wrapper.getActivity().execute( inputs, settings, ctx ); + wrapper.getActivity().execute( inputs, wrapper.resolveSettings(), ctx ); + wrapper.setOutTypePreview( sm.getOptionalCheckpointTypes( wrapper.getId() ) ); + ctx.updateProgress( 1 ); // ensure progress is correct } catch ( Exception e ) { throw new ExecutorException( e ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/Executor.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/Executor.java index 4cc35ac730..25df52a45f 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/Executor.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/Executor.java @@ -16,25 +16,24 @@ package org.polypheny.db.workflow.engine.execution; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.List; -import java.util.UUID; import java.util.concurrent.Callable; import org.polypheny.db.workflow.dag.Workflow; import org.polypheny.db.workflow.dag.activities.ActivityWrapper; import org.polypheny.db.workflow.dag.edges.DataEdge; import org.polypheny.db.workflow.dag.edges.Edge; -import org.polypheny.db.workflow.dag.variables.ReadableVariableStore; -import org.polypheny.db.workflow.dag.variables.WritableVariableStore; import org.polypheny.db.workflow.engine.storage.StorageManager; import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader; /** * An executor is responsible for executing a connected subgraph of a workflow. - * Predecessor activities of activities in this subgraph are guaranteed to have finished their execution (and written their results to checkpoints) + * Predecessor activities of activities in this subgraph are guaranteed to have finished their execution (and written their results to checkpoints and merged their variables) * or are itself part of this subgraph. * After the Executor gets instantiated, the workflow structure remains static at least until execution finishes. * The execution of the subgraph is atomic: it is either completely successful or fails (=throws an exception). - * An executor is responsible for updating the variables of its activities (including before execution starts). + * An executor is responsible for updating the variables of its non-leaf activities (including before execution starts) and the outTypePreview of all activities (in case the execution is successful). * An executor does not modify the state of any activities. The scheduler is responsible for this. */ public abstract class Executor implements Callable { @@ -74,27 +73,6 @@ public Void call() throws ExecutorException { } - /** - * Merges all the active (= successfully executed) input variableStores of the target activity and updates the target activity variableStore accordingly. - * In the process, the target variableStore is completely reset. - * - * @param targetId the identifier of the target activity whose variables are going to be set based on its inputs. - */ - void mergeInputVariables( UUID targetId ) { - List edges = workflow.getInEdges( targetId ); - - ActivityWrapper target = workflow.getActivity( targetId ); - WritableVariableStore targetVariables = target.getVariables(); - targetVariables.clear(); - for ( Edge edge : edges ) { - if ( edge.isActive() ) { - ReadableVariableStore inputVariables = edge.getFrom().getVariables(); - targetVariables.merge( inputVariables ); - } - } - } - - List getReaders( ActivityWrapper target ) { CheckpointReader[] inputs = new CheckpointReader[target.getDef().getInPorts().length]; for ( Edge edge : workflow.getInEdges( target.getId() ) ) { @@ -147,6 +125,15 @@ public ExecutorException( String message, Throwable cause ) { super( message, cause ); } + + public ObjectNode getVariableValue() { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode node = mapper.createObjectNode(); + node.put( "message", getMessage() ); + node.put( "cause", getCause().getMessage() ); + return node; + } + } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/FusionExecutor.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/FusionExecutor.java index 4f5fe1ed57..4b6b3c1aba 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/FusionExecutor.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/FusionExecutor.java @@ -18,10 +18,10 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import org.apache.commons.lang3.NotImplementedException; import org.polypheny.db.algebra.AlgNode; -import org.polypheny.db.algebra.type.AlgDataType; import org.polypheny.db.plan.AlgCluster; import org.polypheny.db.util.graph.AttributedDirectedGraph; import org.polypheny.db.workflow.dag.Workflow; @@ -29,7 +29,6 @@ import org.polypheny.db.workflow.dag.activities.Fusable; import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue; import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge; -import org.polypheny.db.workflow.engine.scheduler.GraphUtils; import org.polypheny.db.workflow.engine.storage.StorageManager; /** @@ -41,18 +40,20 @@ */ public class FusionExecutor extends Executor { - private final AttributedDirectedGraph execTree; // TODO: just use Set instead? + private final AttributedDirectedGraph execTree; + private final UUID rootId; - public FusionExecutor( StorageManager sm, Workflow workflow, AttributedDirectedGraph execTree ) { + public FusionExecutor( StorageManager sm, Workflow workflow, AttributedDirectedGraph execTree, UUID rootId ) { super( sm, workflow ); this.execTree = execTree; + this.rootId = rootId; } @Override void execute() throws ExecutorException { - UUID rootId = GraphUtils.findInvertedTreeRoot( execTree ); + System.out.println( "Start execution fused tree: " + execTree ); try { // TODO: implement after PolyAlgebra is merged @@ -84,9 +85,9 @@ public void interrupt() { } - private AlgNode constructAlgNode( UUID rootId, AlgCluster cluster ) throws Exception { - ActivityWrapper wrapper = workflow.getActivity( rootId ); - List inEdges = execTree.getInwardEdges( rootId ); + private AlgNode constructAlgNode( UUID root, AlgCluster cluster ) throws Exception { + ActivityWrapper wrapper = workflow.getActivity( root ); + List inEdges = execTree.getInwardEdges( root ); AlgNode[] inputsArr = new AlgNode[wrapper.getDef().getInPorts().length]; for ( ExecutionEdge edge : inEdges ) { assert !edge.isControl() : "Execution tree for fusion must not contain control edges"; @@ -100,14 +101,16 @@ private AlgNode constructAlgNode( UUID rootId, AlgCluster cluster ) throws Excep } List inputs = List.of( inputsArr ); - mergeInputVariables( rootId ); + if ( !inEdges.isEmpty() ) { + workflow.recomputeInVariables( root ); // inner nodes should get their variables merged + } - List inTypes = inputs.stream().map( AlgNode::getTupleType ).toList(); Map settings = wrapper.resolveSettings(); Fusable activity = (Fusable) wrapper.getActivity(); - activity.updateVariables( inTypes, settings, wrapper.getVariables() ); - return activity.fuse( inputs, settings, cluster ); + AlgNode fused = activity.fuse( inputs, settings, cluster ); + wrapper.setOutTypePreview( List.of( Optional.of( fused.getTupleType() ) ) ); + return fused; } } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/PipeExecutor.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/PipeExecutor.java index 3d24fc58d6..184a148b3b 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/PipeExecutor.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/PipeExecutor.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; @@ -45,7 +46,6 @@ import org.polypheny.db.workflow.engine.execution.pipe.OutputPipe; import org.polypheny.db.workflow.engine.execution.pipe.QueuePipe; import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge; -import org.polypheny.db.workflow.engine.scheduler.GraphUtils; import org.polypheny.db.workflow.engine.storage.StorageManager; import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader; import org.polypheny.db.workflow.engine.storage.writer.CheckpointWriter; @@ -57,7 +57,8 @@ */ public class PipeExecutor extends Executor { - private final AttributedDirectedGraph execTree; // TODO: just use Set instead? + private final AttributedDirectedGraph execTree; + private final UUID rootId; private final Map outQueues = new HashMap<>(); // maps activities to their (only!) output queue private final Map> settingsSnapshot = new HashMap<>(); private final int queueCapacity; @@ -66,9 +67,10 @@ public class PipeExecutor extends Executor { private ExecutorService executor; - public PipeExecutor( StorageManager sm, Workflow workflow, AttributedDirectedGraph execTree, int queueCapacity ) { + public PipeExecutor( StorageManager sm, Workflow workflow, AttributedDirectedGraph execTree, UUID rootId, int queueCapacity ) { super( sm, workflow ); this.execTree = execTree; + this.rootId = rootId; this.queueCapacity = queueCapacity; assert execTree.vertexSet().size() > 1 : "A PipeExecutor is not suited for the execution of a single activity, since the CheckpointPipes do not check for interrupts."; @@ -145,10 +147,10 @@ public void interrupt() { } - private AlgDataType registerOutputPipes( UUID rootId ) throws Exception { - ActivityWrapper wrapper = workflow.getActivity( rootId ); + private AlgDataType registerOutputPipes( UUID root ) throws Exception { + ActivityWrapper wrapper = workflow.getActivity( root ); - List inEdges = execTree.getInwardEdges( rootId ); + List inEdges = execTree.getInwardEdges( root ); List inTypes; if ( inEdges.isEmpty() ) { // leaf node @@ -159,20 +161,21 @@ private AlgDataType registerOutputPipes( UUID rootId ) throws Exception { for ( ExecutionEdge e : inEdges ) { inTypes.add( registerOutputPipes( e.getSource() ) ); } + workflow.recomputeInVariables( root ); // inner nodes should get their variables merged } - mergeInputVariables( rootId ); - Map settings = wrapper.resolveSettings(); - settingsSnapshot.put( rootId, settings ); // store current state of settings for later use, since updateVariables might change it + settingsSnapshot.put( root, settings ); // store current state of settings for later use Pipeable activity = (Pipeable) wrapper.getActivity(); - activity.updateVariables( inTypes, settings, wrapper.getVariables() ); AlgDataType outType = activity.lockOutputType( inTypes, settings ); if ( outType != null ) { - outQueues.put( rootId, new QueuePipe( queueCapacity, outType ) ); + outQueues.put( root, new QueuePipe( queueCapacity, outType ) ); + wrapper.setOutTypePreview( List.of( Optional.of( outType ) ) ); + } else { + // we are at the actual root of the tree, and it's an activity with no outputs. + wrapper.setOutTypePreview( List.of() ); } - // else: we are at the actual root of the tree, and it's an activity with no outputs. return outType; } @@ -192,7 +195,6 @@ private OutputPipe getCheckpointWriterPipe( UUID rootId, AlgDataType rootType ) private List> getCallables() throws Exception { - UUID rootId = GraphUtils.findInvertedTreeRoot( execTree ); AlgDataType rootType = registerOutputPipes( rootId ); @@ -244,6 +246,7 @@ private Callable getCallable( ActivityWrapper wrapper, List inP } } } + ctx.updateProgress( 1 ); } return null; }; diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/VariableWriterExecutor.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/VariableWriterExecutor.java index b6d94a6398..3ddb243af0 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/VariableWriterExecutor.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/VariableWriterExecutor.java @@ -17,13 +17,10 @@ package org.polypheny.db.workflow.engine.execution; import java.util.List; -import java.util.Map; import java.util.UUID; -import org.polypheny.db.algebra.type.AlgDataType; import org.polypheny.db.workflow.dag.Workflow; import org.polypheny.db.workflow.dag.activities.ActivityWrapper; import org.polypheny.db.workflow.dag.activities.VariableWriter; -import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue; import org.polypheny.db.workflow.engine.execution.context.ExecutionContextImpl; import org.polypheny.db.workflow.engine.storage.StorageManager; import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader; @@ -43,16 +40,12 @@ public VariableWriterExecutor( StorageManager sm, Workflow workflow, UUID activi @Override void execute() throws ExecutorException { List inputs = getReaders( wrapper ); - List inputTypes = inputs.stream().map( CheckpointReader::getTupleType ).toList(); - - mergeInputVariables( wrapper.getId() ); - Map settings = wrapper.resolveSettings(); // settings before variable update ctx = new ExecutionContextImpl( wrapper, sm ); try ( CloseableList ignored = new CloseableList( inputs ) ) { VariableWriter activity = (VariableWriter) wrapper.getActivity(); - // we skip activity.updateVariables() on purpose, since variable updates are performed within execute - activity.execute( inputs, settings, ctx, wrapper.getVariables() ); + activity.execute( inputs, wrapper.resolveSettings(), ctx, wrapper.getVariables() ); + wrapper.setOutTypePreview( sm.getOptionalCheckpointTypes( wrapper.getId() ) ); } catch ( Exception e ) { throw new ExecutorException( e ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContextImpl.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContextImpl.java index 1862a4aa32..f071ed44e7 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContextImpl.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContextImpl.java @@ -128,7 +128,7 @@ public Transaction getTransaction() { if ( !activityWrapper.getDef().hasCategory( ActivityCategory.EXTRACT ) && !activityWrapper.getDef().hasCategory( ActivityCategory.LOAD ) ) { throw new IllegalStateException( "Only EXTRACT or LOAD activities have access to transactions" ); } - return sm.getTransaction( activityWrapper.getId(), activityWrapper.getConfig().getTransactionMode() ); + return sm.getTransaction( activityWrapper.getId(), activityWrapper.getConfig().getCommonType() ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionEdge.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionEdge.java index c398b2cedd..2aa6c809c8 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionEdge.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionEdge.java @@ -123,6 +123,19 @@ public boolean equals( Object obj ) { } + @Override + public String toString() { + return "ExecutionEdge{" + + "source=" + source + + ", target=" + target + + ", isControl=" + isControl + + ", fromPort=" + fromPort + + ", toPort=" + toPort + + ", onSuccess=" + onSuccess + + '}'; + } + + public static class ExecutionEdgeFactory implements AttributedDirectedGraph.AttributedEdgeFactory { @Override diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionResult.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionResult.java index b8b3f6d2f6..2a43f84abc 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionResult.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionResult.java @@ -46,6 +46,11 @@ public UUID getSessionId() { } + public UUID getRootId() { + return submission.getRootId(); + } + + public boolean isSuccess() { return exception == null; } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionSubmission.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionSubmission.java index 2941f33c9b..aa1d41df64 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionSubmission.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionSubmission.java @@ -20,14 +20,15 @@ import java.util.UUID; import lombok.Value; import org.polypheny.db.workflow.engine.execution.Executor; -import org.polypheny.db.workflow.models.ActivityConfigModel.CommonTransaction; +import org.polypheny.db.workflow.models.ActivityConfigModel.CommonType; @Value public class ExecutionSubmission { - CommonTransaction commonType; + CommonType commonType; Executor executor; Set activities; + UUID rootId; UUID sessionId; } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java index d58c3bcb0a..ad187f7e9d 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java @@ -73,7 +73,6 @@ public synchronized void startExecution( Workflow workflow, StorageManager sm, @ throw new GenericRuntimeException( "Cannot execute a workflow that is already being executed." ); } interruptedSessions.remove( sessionId ); - WorkflowScheduler scheduler = new WorkflowScheduler( workflow, sm, globalWorkers, targetActivity ); List submissions = scheduler.startExecution(); if ( submissions.isEmpty() ) { @@ -120,11 +119,18 @@ public synchronized void shutdownNow() { } + public void awaitResultProcessor( long millis ) throws InterruptedException { + resultProcessor.join( millis ); + } + + private void submit( List submissions ) { for ( ExecutionSubmission submission : submissions ) { + log.info( "Submitting {}", submission ); UUID sessionId = submission.getSessionId(); completionService.submit( () -> { + log.info( "Starting actual execution {}", submission ); if ( interruptedSessions.contains( sessionId ) ) { return new ExecutionResult( submission, new ExecutorException( "Execution was interrupted before it started" ) ); } @@ -140,6 +146,7 @@ private void submit( List submissions ) { result = new ExecutionResult( submission, new ExecutorException( "Unexpected exception", e ) ); } activeSubmissions.get( sessionId ).remove( submission ); + log.info( "Finished actual execution with result {}", result ); return result; } ); } @@ -148,10 +155,12 @@ private void submit( List submissions ) { private Thread startResultProcessor() { Thread t = new Thread( () -> { + log.info( "Started ResultProcessor thread" ); while ( true ) { List nextSubmissions; try { ExecutionResult result = completionService.take().get(); + log.info( "Processing result: {}", result ); WorkflowScheduler scheduler = schedulers.get( result.getSessionId() ); nextSubmissions = scheduler.handleExecutionResult( result ); @@ -173,6 +182,7 @@ private Thread startResultProcessor() { submit( nextSubmissions ); } } + log.info( "Processor is finished" ); } ); t.start(); return t; diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GraphUtils.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GraphUtils.java index 2813d4aa78..a0b6a11d90 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GraphUtils.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GraphUtils.java @@ -18,11 +18,15 @@ import java.util.Collection; import java.util.HashSet; +import java.util.LinkedList; import java.util.Objects; +import java.util.Queue; import java.util.Set; +import lombok.NonNull; import org.polypheny.db.util.graph.AttributedDirectedGraph; import org.polypheny.db.util.graph.AttributedDirectedGraph.AttributedEdgeFactory; import org.polypheny.db.util.graph.DefaultEdge; +import org.polypheny.db.util.graph.TopologicalOrderIterator; public class GraphUtils { @@ -79,4 +83,28 @@ public static AttributedDirectedGraph getInduce return subgraph; } + + /** + * Returns a TopologicalOrderIterator for the subgraph that is reachable from root (excluding root) + */ + public static Iterable getTopologicalIterable( AttributedDirectedGraph graph, @NonNull V root, boolean includeRoot ) { + Set reachable = new HashSet<>(); + Queue open = new LinkedList<>(); + + if ( includeRoot ) { + open.add( root ); + } else { + graph.getOutwardEdges( root ).forEach( e -> open.add( (V) e.target ) ); + } + while ( !open.isEmpty() ) { + V n = open.remove(); + if ( !reachable.contains( n ) ) { + reachable.add( n ); + graph.getOutwardEdges( n ).forEach( e -> open.add( (V) e.target ) ); + } + } + + return TopologicalOrderIterator.of( getInducedSubgraph( graph, reachable ) ); + } + } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java index dd0697c4c4..eca67ba4af 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java @@ -16,6 +16,7 @@ package org.polypheny.db.workflow.engine.scheduler; +import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -27,24 +28,26 @@ import java.util.Set; import java.util.UUID; import javax.annotation.Nullable; -import org.apache.commons.lang3.NotImplementedException; +import lombok.extern.slf4j.Slf4j; import org.polypheny.db.algebra.type.AlgDataType; import org.polypheny.db.catalog.exceptions.GenericRuntimeException; +import org.polypheny.db.transaction.TransactionException; import org.polypheny.db.util.graph.AttributedDirectedGraph; -import org.polypheny.db.util.graph.TopologicalOrderIterator; import org.polypheny.db.workflow.dag.Workflow; -import org.polypheny.db.workflow.dag.activities.Activity; +import org.polypheny.db.workflow.dag.Workflow.WorkflowState; import org.polypheny.db.workflow.dag.activities.ActivityException; import org.polypheny.db.workflow.dag.activities.ActivityWrapper; import org.polypheny.db.workflow.dag.activities.ActivityWrapper.ActivityState; -import org.polypheny.db.workflow.dag.edges.DataEdge; +import org.polypheny.db.workflow.dag.edges.ControlEdge; import org.polypheny.db.workflow.dag.edges.Edge; import org.polypheny.db.workflow.dag.edges.Edge.EdgeState; import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue; +import org.polypheny.db.workflow.engine.execution.Executor.ExecutorException; import org.polypheny.db.workflow.engine.scheduler.optimizer.WorkflowOptimizer; import org.polypheny.db.workflow.engine.scheduler.optimizer.WorkflowOptimizer.SubmissionFactory; import org.polypheny.db.workflow.engine.scheduler.optimizer.WorkflowOptimizerImpl; import org.polypheny.db.workflow.engine.storage.StorageManager; +import org.polypheny.db.workflow.models.ActivityConfigModel.CommonType; /** * The scheduler takes a workflow and a (optional) target activitiy. @@ -55,6 +58,7 @@ * WorkflowScheduler instance after its construction. * Sole control over the provided Workflow and StorageManager is given to the scheduler from creation up to the point the execution is finished. */ +@Slf4j public class WorkflowScheduler { private final Workflow workflow; @@ -63,20 +67,21 @@ public class WorkflowScheduler { private final AttributedDirectedGraph execDag; private final WorkflowOptimizer optimizer; - private boolean isExtractFinished = false; - private boolean isLoadFinished = false; private boolean isAborted; // either by interruption or failure private boolean isFinished; private int pendingCount = 0; // current number of unfinished submissions private final Set remainingActivities = new HashSet<>(); // activities that have not finished execution - private final Set pendingActivities = new HashSet<>(); // contains activities submitted for execution + private final Set remainingCommonExtract = new HashSet<>(); + private final Set remainingCommonLoad = new HashSet<>(); - private final Map>> typePreviews = new HashMap<>(); // contains the (possibly not yet known) output types of execDag activities + private final Map>> inTypePreviews = new HashMap<>(); // contains the (possibly not yet known) input types of execDag activities private final Map>> settingsPreviews = new HashMap<>(); // contains the (possibly not yet known) settings of execDag activities public WorkflowScheduler( Workflow workflow, StorageManager sm, int globalWorkers, @Nullable UUID targetActivity ) throws Exception { + log.info( "Instantiating WorkflowScheduler with target: {}", targetActivity ); + workflow.setState( WorkflowState.EXECUTING ); this.workflow = workflow; this.sm = sm; this.maxWorkers = Math.min( workflow.getConfig().getMaxWorkers(), globalWorkers ); @@ -85,12 +90,12 @@ public WorkflowScheduler( Workflow workflow, StorageManager sm, int globalWorker throw new GenericRuntimeException( "A saved activity first needs to be reset before executing it" ); } - validateStructure(); - validateCommonExtract(); - validateCommonLoad(); - this.execDag = targetActivity == null ? prepareExecutionDag() : prepareExecutionDag( List.of( targetActivity ) ); - initPreviews(); + log.info( "ExecDag after initialization: {}", this.execDag ); + + workflow.validateStructure( this.execDag ); + log.info( "Structure is valid" ); + this.optimizer = new WorkflowOptimizerImpl( workflow, execDag ); } @@ -103,30 +108,49 @@ public List startExecution() { public List handleExecutionResult( ExecutionResult result ) { pendingCount--; - remainingActivities.removeAll( result.getActivities() ); + + try { + updateRemaining( result ); + } catch ( TransactionException e ) { + result = new ExecutionResult( result.getSubmission(), new ExecutorException( "An error occurred while closing open transactions of executed activities", e ) ); + } + + if ( !result.isSuccess() ) { + Throwable cause = result.getException().getCause(); + + // for debugging + result.getException().printStackTrace(); + if ( cause != null ) { + log.warn( "ExecutorException has inner exception", cause ); + } + setErrorVariable( result.getActivities(), result.getException() ); + } + + updateGraph( result.isSuccess(), result.getActivities(), result.getRootId(), execDag ); if ( remainingActivities.isEmpty() ) { assert pendingCount == 0; isFinished = true; + workflow.setState( WorkflowState.IDLE ); return null; } if ( isAborted ) { if ( pendingCount == 0 ) { isFinished = true; + workflow.setState( WorkflowState.IDLE ); } return null; } - propagateResult( result.isSuccess(), result.getActivities() ); - return computeNextSubmissions(); } public void interruptExecution() { isAborted = true; + workflow.setState( WorkflowState.INTERRUPTED ); } @@ -159,7 +183,7 @@ private AttributedDirectedGraph prepareExecutionDag() throw private AttributedDirectedGraph prepareExecutionDag( List targets ) throws Exception { if ( targets.isEmpty() ) { - throw new GenericRuntimeException( "Cannot prepare executionDag for no targets" ); + throw new GenericRuntimeException( "Cannot prepare executionDag for empty targets" ); } Set savedActivities = new HashSet<>(); Queue open = new LinkedList<>( targets ); @@ -174,119 +198,203 @@ private AttributedDirectedGraph prepareExecutionDag( List execute all of them again or only new ones, or fail? if ( nWrapper.getState() == ActivityState.SAVED ) { savedActivities.add( n ); continue; } + nWrapper.resetExecution(); nWrapper.setState( ActivityState.QUEUED ); - for ( Edge edge : workflow.getInEdges( n ) ) { + remainingActivities.add( n ); + if ( type == CommonType.EXTRACT ) { + remainingCommonExtract.add( n ); + } else if ( type == CommonType.LOAD ) { + remainingCommonLoad.add( n ); + } + + List inEdges = workflow.getInEdges( n ); + for ( Edge edge : inEdges ) { edge.setState( EdgeState.IDLE ); open.add( edge.getFrom().getId() ); } + if ( inEdges.isEmpty() ) { + // TODO: also initialize any activities that are not successors to SAVED activity + workflow.recomputeInVariables( n ); + inTypePreviews.put( n, List.of() ); + try { + Map> settings = nWrapper.updateOutTypePreview( List.of(), true ); + settingsPreviews.put( n, settings ); + } catch ( ActivityException e ) { + // TODO: detected an inconsistency in the types and settings. Ignore? + e.printStackTrace(); + } + + } } AttributedDirectedGraph execDag = GraphUtils.getInducedSubgraph( workflow.toDag(), visited ); // handle saved activities (= simulate that they finish their execution successfully) for ( UUID saved : savedActivities ) { - updateGraph( true, Set.of( saved ), execDag ); // result propagation needs to happen individually + updateGraph( true, Set.of( saved ), saved, execDag ); // result propagation needs to happen individually } return execDag; } - private void validateStructure() throws Exception { - // no cycles - // compatible DataModels for edges - // compatible settings - // TODO: verify succesors of idle nodes are idle as well - // TODO: ensure all nodes to be executed have an empty variable store + private void updateRemaining( ExecutionResult result ) throws TransactionException { + CommonType type = result.getSubmission().getCommonType(); + Set activities = result.getActivities(); + remainingActivities.removeAll( activities ); + + if ( type == CommonType.NONE ) { + if ( result.isSuccess() ) { + activities.forEach( sm::commitTransaction ); + } else { + activities.forEach( sm::rollbackTransaction ); + } + } else { + Set remainingCommon = type == CommonType.EXTRACT ? remainingCommonExtract : remainingCommonLoad; + remainingCommon.removeAll( activities ); + if ( result.isSuccess() ) { + if ( remainingCommon.isEmpty() ) { + sm.commitCommonTransaction( type ); + } + } else { + for ( UUID n : remainingCommon ) { + ActivityWrapper wrapper = workflow.getActivity( n ); + if ( wrapper.getState() == ActivityState.QUEUED ) { + wrapper.setState( ActivityState.SKIPPED ); // TODO: only skip later when updating graph? + } + } + remainingCommon.clear(); + sm.rollbackCommonTransaction( type ); // TODO: only roll back when all executing common have finished? + } + } } - private void validateCommonExtract() throws Exception { - isExtractFinished = true; // TODO: only if no common extract activities present + private List computeNextSubmissions() { + List factories = optimizer.computeNextTrees( inTypePreviews, settingsPreviews, maxWorkers - pendingCount, getActiveCommonType() ); + pendingCount += factories.size(); + List submissions = factories.stream().map( f -> f.create( sm, workflow ) ).toList(); + for ( ExecutionSubmission submission : submissions ) { + setStates( submission.getActivities(), ActivityState.EXECUTING ); + } + + return submissions; } - private void validateCommonLoad() throws Exception { - isLoadFinished = true; // TODO: only if no common load activities present + private void setErrorVariable( Set activities, ExecutorException exception ) { + ObjectNode value = exception.getVariableValue(); + for ( UUID n : activities ) { + workflow.getActivity( n ).getVariables().setError( value ); + } } - private void initPreviews() throws ActivityException { - for ( UUID n : TopologicalOrderIterator.of( execDag ) ) { + private void updateGraph( boolean isSuccess, Set activities, UUID rootId, AttributedDirectedGraph dag ) { + // must not access this.execDag as it might be null at this point + // TODO: any not yet executed activity whose input edges are all either Active or Inactive should have their variableStores updated -> reduce number of empty optionals in previews / canFuse etc. + ActivityWrapper root = workflow.getActivity( rootId ); + boolean isInitialUpdate = root.getState() == ActivityState.SAVED; + for ( UUID n : activities ) { ActivityWrapper wrapper = workflow.getActivity( n ); - Activity activity = wrapper.getActivity(); - ActivityState state = wrapper.getState(); - - if ( state == ActivityState.SAVED ) { - // settings are not required for already executed nodes - typePreviews.put( n, sm.getTupleTypes( n ).stream().map( Optional::ofNullable ).toList() ); - - } else if ( state == ActivityState.IDLE ) { - List> inputTypes = new ArrayList<>(); - boolean allInputsSaved = true; - for ( int i = 0; i < wrapper.getDef().getInPorts().length; i++ ) { - DataEdge dataEdge = workflow.getDataEdge( n, i ); - ActivityWrapper inWrapper = dataEdge.getFrom(); - if ( remainingActivities.contains( inWrapper.getId() ) ) { - allInputsSaved = false; - } - inputTypes.add( typePreviews.get( inWrapper.getId() ).get( dataEdge.getFromPort() ) ); + if ( !isInitialUpdate ) { + if ( isSuccess ) { + wrapper.setState( n == rootId ? ActivityState.SAVED : ActivityState.FINISHED ); + } else { + wrapper.setState( ActivityState.FAILED ); } - // TODO: ensure control inputs are also saved, then merge variables correctly - // Also change executor merge to be correct (correct order, only active) - - Map> settings = wrapper.resolveAvailableSettings(); - settingsPreviews.put( n, settings ); - typePreviews.put( n, activity.previewOutTypes( inputTypes, settings ) ); - - } else { - throw new IllegalStateException( "Illegal state of activity while initiating scheduler: " + state + " for " + n ); } - switch ( state ) { - - case IDLE -> { + for ( ExecutionEdge execEdge : dag.getOutwardEdges( n ) ) { + Edge edge = workflow.getEdge( execEdge ); + if ( activities.contains( execEdge.getTarget() ) ) { + edge.setState( isSuccess ? EdgeState.ACTIVE : EdgeState.INACTIVE ); + } else { + assert edge.isIgnored() || !edge.getTo().getState().isExecuted() : + "Encountered an activity that was executed before its predecessors: " + edge.getTo(); + + boolean isActive = isSuccess ? + !(edge instanceof ControlEdge control) || control.isOnSuccess() : + (edge instanceof ControlEdge control && !control.isOnSuccess()); + propagateResult( isActive, edge, dag ); } - case QUEUED -> { - } - case EXECUTING -> { - } - case SKIPPED -> { - } - case FAILED -> { - } - case FINISHED -> { - } - case SAVED -> { + } + } + + // recompute successor typePreviews and settings + // TODO: update entire workflow instead of dag? + for ( UUID n : GraphUtils.getTopologicalIterable( dag, rootId, false ) ) { + ActivityWrapper wrapper = workflow.getActivity( n ); + if ( isInitialUpdate && inTypePreviews.containsKey( n ) ) { + continue; // only initialize once + } + if ( wrapper.getState() == ActivityState.QUEUED ) { + workflow.recomputeInVariables( n ); + List> inTypes = workflow.getInputTypes( n ); + inTypePreviews.put( n, inTypes ); + try { + Map> settings = wrapper.updateOutTypePreview( inTypes, workflow.hasStableInVariables( n ) ); + settingsPreviews.put( n, settings ); + } catch ( ActivityException e ) { + // TODO: detected an inconsistency in the types and settings. Ignore? + e.printStackTrace(); } } } } - private List computeNextSubmissions() { - // TODO: determine previews + private void propagateResult( boolean isActive, Edge edge, AttributedDirectedGraph dag ) { + // must not access this.execDag as it might be null at this point + edge.setState( isActive ? EdgeState.ACTIVE : EdgeState.INACTIVE ); + if ( edge.isIgnored() || (isActive && !(edge instanceof ControlEdge)) ) { + return; // Cannot propagate activation of a data edge or ignored edge, since it cannot change the state of the activity + } + ActivityWrapper target = edge.getTo(); + assert !target.getState().isExecuted() : "Encountered an activity that was executed before its predecessors: " + target; - List factories = optimizer.computeNextTrees( null, null, maxWorkers - pendingCount, null ); - pendingCount += factories.size(); - return factories.stream().map( f -> f.create( sm, workflow ) ).toList(); + List inEdges = workflow.getInEdges( target.getId() ); + EdgeState canExecute = target.canExecute( inEdges ); + switch ( canExecute ) { + case IDLE -> { + } + case ACTIVE -> { + List controlEdges = inEdges.stream().filter( e -> e instanceof ControlEdge ).map( e -> (ControlEdge) e ).toList(); + if ( !controlEdges.isEmpty() ) { + controlEdges.forEach( e -> e.setIgnored( true ) ); // even the active edges can be ignored, since they are not needed anymore + } + } + case INACTIVE -> { + target.setState( ActivityState.SKIPPED ); + remainingActivities.remove( target.getId() ); + remainingCommonExtract.remove( target.getId() ); // TODO: what if a common activity is skipped? workflow config? + remainingCommonLoad.remove( target.getId() ); + // a skipped activity does NOT count as failed -> onFail control edges also become INACTIVE + workflow.getOutEdges( target.getId() ).forEach( e -> propagateResult( false, e, dag ) ); + } + } } - private void propagateResult( boolean isSuccess, Set activities ) { - throw new NotImplementedException(); + private CommonType getActiveCommonType() { + if ( !remainingCommonExtract.isEmpty() ) { + return CommonType.EXTRACT; + } + if ( remainingActivities.size() > remainingCommonLoad.size() ) { + return CommonType.NONE; + } + return CommonType.LOAD; } - private void updateGraph( boolean isSuccess, Set activities, AttributedDirectedGraph dag ) { - // does not access this.execDag - // TODO: any not yet executed activity whose input edges are all either Active or Inactive should have their variableStores updated -> reduce number of empty optionals in previews / canFuse etc. - throw new NotImplementedException(); + private void setStates( Set activities, ActivityState state ) { + activities.forEach( id -> workflow.getActivity( id ).setState( state ) ); } } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java index f77a2963e7..bb1586261b 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java @@ -24,7 +24,6 @@ import java.util.UUID; import lombok.Getter; import org.polypheny.db.algebra.type.AlgDataType; -import org.polypheny.db.catalog.exceptions.GenericRuntimeException; import org.polypheny.db.util.graph.AttributedDirectedGraph; import org.polypheny.db.workflow.dag.Workflow; import org.polypheny.db.workflow.dag.activities.ActivityWrapper; @@ -32,8 +31,6 @@ import org.polypheny.db.workflow.dag.activities.Fusable; import org.polypheny.db.workflow.dag.activities.Pipeable; import org.polypheny.db.workflow.dag.activities.VariableWriter; -import org.polypheny.db.workflow.dag.edges.Edge; -import org.polypheny.db.workflow.dag.edges.Edge.EdgeState; import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue; import org.polypheny.db.workflow.engine.execution.DefaultExecutor; import org.polypheny.db.workflow.engine.execution.Executor; @@ -46,25 +43,31 @@ import org.polypheny.db.workflow.engine.scheduler.ExecutionSubmission; import org.polypheny.db.workflow.engine.scheduler.GraphUtils; import org.polypheny.db.workflow.engine.storage.StorageManager; -import org.polypheny.db.workflow.models.ActivityConfigModel.CommonTransaction; +import org.polypheny.db.workflow.models.ActivityConfigModel.CommonType; +import org.polypheny.db.workflow.models.WorkflowConfigModel; public abstract class WorkflowOptimizer { final Workflow workflow; final AttributedDirectedGraph execDag; + final boolean isFusionEnabled; + final boolean isPipelineEnabled; - Map>> typePreviews; + Map>> inTypePreviews; Map>> settingsPreviews; protected WorkflowOptimizer( Workflow workflow, AttributedDirectedGraph execDag ) { this.workflow = workflow; this.execDag = execDag; + WorkflowConfigModel config = workflow.getConfig(); + isFusionEnabled = config.isFusionEnabled(); + isPipelineEnabled = config.isPipelineEnabled(); } - public final List computeNextTrees( Map>> typePreviews, Map>> settingsPreviews, int submissionCount, CommonTransaction commonType ) { - this.typePreviews = typePreviews; + public final List computeNextTrees( Map>> inTypePreviews, Map>> settingsPreviews, int submissionCount, CommonType commonType ) { + this.inTypePreviews = inTypePreviews; this.settingsPreviews = settingsPreviews; List orderedCandidates = computeNextTrees( commonType ); @@ -80,7 +83,7 @@ public final List computeNextTrees( Map computeNextTrees( CommonTransaction commonType ); + abstract List computeNextTrees( CommonType commonType ); /** @@ -91,7 +94,7 @@ public final List computeNextTrees( Map return true if empty Optional - return writer.requestsToWrite( typePreviews.get( activityId ), settingsPreviews.get( activityId ) ).orElse( true ); + return writer.requestsToWrite( inTypePreviews.get( activityId ), settingsPreviews.get( activityId ) ).orElse( true ); } return false; } @@ -124,11 +127,11 @@ ActivityState getState( UUID activityId ) { } - AttributedDirectedGraph getCommonSubExecDag( CommonTransaction commonType ) { + AttributedDirectedGraph getCommonSubExecDag( CommonType commonType ) { Set nodes = new HashSet<>(); for ( UUID n : execDag.vertexSet() ) { ActivityWrapper wrapper = workflow.getActivity( n ); - if ( wrapper.getConfig().getTransactionMode() == commonType ) { + if ( wrapper.getConfig().getCommonType() == commonType ) { nodes.add( n ); } } @@ -136,31 +139,16 @@ AttributedDirectedGraph getCommonSubExecDag( CommonTransact } - Edge getEdge( ExecutionEdge edge ) { - for ( Edge candidate : workflow.getEdges( edge.getSource(), edge.getTarget() ) ) { - if ( edge.representsEdge( candidate ) ) { - return candidate; - } - } - throw new IllegalArgumentException( "Cannot return Edge of ExecutionEdge that is not part of the workflow: " + edge ); - } - - - EdgeState getEdgeState( ExecutionEdge edge ) { - return getEdge( edge ).getState(); - } - - @Getter public static class SubmissionFactory { private final AttributedDirectedGraph tree; private final Set activities; private final ExecutorType executorType; - private final CommonTransaction commonType; + private final CommonType commonType; - public SubmissionFactory( AttributedDirectedGraph tree, Set activities, ExecutorType executorType, CommonTransaction commonType ) { + public SubmissionFactory( AttributedDirectedGraph tree, Set activities, ExecutorType executorType, CommonType commonType ) { this.tree = tree; this.activities = activities; this.executorType = executorType; @@ -168,7 +156,7 @@ public SubmissionFactory( AttributedDirectedGraph tree, Set } - public SubmissionFactory( UUID activity, ExecutorType executorType, CommonTransaction commonType ) { + public SubmissionFactory( UUID activity, ExecutorType executorType, CommonType commonType ) { this.executorType = executorType; this.activities = Set.of( activity ); this.commonType = commonType; @@ -178,22 +166,23 @@ public SubmissionFactory( UUID activity, ExecutorType executorType, CommonTransa public ExecutionSubmission create( StorageManager sm, Workflow wf ) { + UUID root = getRootActivity(); // root of inverted tree Executor executor = switch ( executorType ) { - case DEFAULT -> new DefaultExecutor( sm, wf, getActivity() ); - case FUSION -> new FusionExecutor( sm, wf, tree ); - case PIPE -> new PipeExecutor( sm, wf, tree, 1000 ); - case VARIABLE_WRITER -> new VariableWriterExecutor( sm, wf, getActivity() ); + case DEFAULT -> new DefaultExecutor( sm, wf, root ); + case FUSION -> new FusionExecutor( sm, wf, tree, root ); + case PIPE -> new PipeExecutor( sm, wf, tree, root, 1000 ); // TODO: use value from config + case VARIABLE_WRITER -> new VariableWriterExecutor( sm, wf, getRootActivity() ); }; - return new ExecutionSubmission( commonType, executor, activities, sm.getSessionId() ); + return new ExecutionSubmission( commonType, executor, activities, root, sm.getSessionId() ); } - private UUID getActivity() { - if ( activities.size() != 1 ) { - throw new GenericRuntimeException( "Invalid number of activities: " + activities.size() ); + private UUID getRootActivity() { + if ( activities.size() == 1 ) { + return activities.iterator().next(); } - return activities.iterator().next(); + return GraphUtils.findInvertedTreeRoot( tree ); } } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizerImpl.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizerImpl.java index a2bc0d259a..c64c7b0d21 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizerImpl.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizerImpl.java @@ -32,14 +32,13 @@ import org.polypheny.db.util.graph.TopologicalOrderIterator; import org.polypheny.db.workflow.dag.Workflow; import org.polypheny.db.workflow.dag.activities.ActivityWrapper.ActivityState; -import org.polypheny.db.workflow.dag.edges.ControlEdge; import org.polypheny.db.workflow.dag.edges.Edge; import org.polypheny.db.workflow.dag.edges.Edge.EdgeState; import org.polypheny.db.workflow.engine.execution.Executor.ExecutorType; import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge; import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge.ExecutionEdgeFactory; import org.polypheny.db.workflow.engine.scheduler.GraphUtils; -import org.polypheny.db.workflow.models.ActivityConfigModel.CommonTransaction; +import org.polypheny.db.workflow.models.ActivityConfigModel.CommonType; public class WorkflowOptimizerImpl extends WorkflowOptimizer { @@ -50,7 +49,7 @@ public WorkflowOptimizerImpl( Workflow workflow, AttributedDirectedGraph computeNextTrees( CommonTransaction commonType ) { + public List computeNextTrees( CommonType commonType ) { AttributedDirectedGraph subDag = AttributedDirectedGraph.create( new ExecutionEdgeFactory() ); Map nodeColors = new HashMap<>(); Map edgeColors = new HashMap<>(); @@ -58,8 +57,12 @@ public List computeNextTrees( CommonTransaction commonType ) // order determines priority if an activity implements multiple interfaces determineVariableWriters( subDag, nodeColors, edgeColors ); - determineFusions( subDag, nodeColors, edgeColors ); - determinePipes( subDag, nodeColors, edgeColors ); + if ( isFusionEnabled ) { + determineFusions( subDag, nodeColors, edgeColors ); + } + if ( isPipelineEnabled ) { + determinePipes( subDag, nodeColors, edgeColors ); + } System.out.println( "\nSub-DAG: " + subDag ); System.out.println( "Node Colors: " + nodeColors ); @@ -88,10 +91,10 @@ private void initializeSubDag( AttributedDirectedGraph base continue; // edge to an activity that was already aborted } - Edge edgeData = getEdge( edge ); + Edge edgeData = workflow.getEdge( edge ); assert edgeData.getState() == EdgeState.IDLE : "Encountered edge of queued or executing activity that is not idle: " + edge; - if ( edgeData instanceof ControlEdge control && control.isIgnored() ) { + if ( edgeData.isIgnored() ) { continue; // control edges that are no longer required are not added to the subDag } @@ -254,7 +257,7 @@ private List createFactories( AttributedDirectedGraph getTupleTypes( UUID activityId ); + List getCheckpointTypes( UUID activityId ); + + List> getOptionalCheckpointTypes( UUID activityId ); /** * Creates a relational checkpoint for an activity output and returns a RelWriter for that checkpoint. @@ -88,7 +92,7 @@ public interface StorageManager extends AutoCloseable { // TODO: remove AutoClos * @param commonType whether to return a common transaction for the specified common type or return a transaction only for this activity * @return the transaction for the activity */ - Transaction getTransaction( UUID activityId, CommonTransaction commonType ); + Transaction getTransaction( UUID activityId, CommonType commonType ); /** * If the activity has any active extract or load transaction associated with it (excluding common transactions) it will be committed. @@ -119,7 +123,7 @@ public interface StorageManager extends AutoCloseable { // TODO: remove AutoClos * * @param commonType which common transaction to commit */ - void commitCommonTransaction( @NonNull CommonTransaction commonType ); + void commitCommonTransaction( @NonNull ActivityConfigModel.CommonType commonType ); /** * Aborts the common transaction of the given type (either EXTRACT or LOAD). @@ -129,6 +133,6 @@ public interface StorageManager extends AutoCloseable { // TODO: remove AutoClos * * @param commonType which common transaction to roll back */ - void rollbackCommonTransaction( @NonNull CommonTransaction commonType ); + void rollbackCommonTransaction( @NonNull ActivityConfigModel.CommonType commonType ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java index 7b8358ab95..d5ee66538c 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; @@ -56,7 +57,8 @@ import org.polypheny.db.workflow.engine.storage.writer.DocWriter; import org.polypheny.db.workflow.engine.storage.writer.LpgWriter; import org.polypheny.db.workflow.engine.storage.writer.RelWriter; -import org.polypheny.db.workflow.models.ActivityConfigModel.CommonTransaction; +import org.polypheny.db.workflow.models.ActivityConfigModel; +import org.polypheny.db.workflow.models.ActivityConfigModel.CommonType; public class StorageManagerImpl implements StorageManager { @@ -145,7 +147,7 @@ public AlgDataType getTupleType( UUID activityId, int outputIdx ) { @Override - public List getTupleTypes( UUID activityId ) { + public List getCheckpointTypes( UUID activityId ) { List types = new ArrayList<>(); Map outputs = checkpoints.get( activityId ); for ( int i = 0; i < outputs.size(); i++ ) { @@ -156,6 +158,12 @@ public List getTupleTypes( UUID activityId ) { } + @Override + public List> getOptionalCheckpointTypes( UUID activityId ) { + return getCheckpointTypes( activityId ).stream().map( Optional::ofNullable ).toList(); + } + + @Override public RelWriter createRelCheckpoint( UUID activityId, int outputIdx, AlgDataType type, boolean resetPk, @Nullable String storeName ) { if ( storeName == null || storeName.isEmpty() ) { @@ -238,7 +246,7 @@ public boolean hasCheckpoint( UUID activityId, int outputIdx ) { @Override - public Transaction getTransaction( UUID activityId, CommonTransaction commonType ) { + public Transaction getTransaction( UUID activityId, CommonType commonType ) { return switch ( commonType ) { case NONE -> localTransactions.computeIfAbsent( activityId, id -> QueryUtils.startTransaction( Catalog.defaultNamespaceId, "LocalTx" ) ); case EXTRACT -> extractTransaction; @@ -271,9 +279,9 @@ public void startCommonTransactions() { @Override - public void commitCommonTransaction( @NonNull CommonTransaction commonType ) { - assert commonType != CommonTransaction.NONE; - Transaction t = commonType == CommonTransaction.EXTRACT ? extractTransaction : loadTransaction; + public void commitCommonTransaction( @NonNull ActivityConfigModel.CommonType commonType ) { + assert commonType != CommonType.NONE; + Transaction t = commonType == CommonType.EXTRACT ? extractTransaction : loadTransaction; if ( t.isActive() ) { t.commit(); } @@ -281,9 +289,9 @@ public void commitCommonTransaction( @NonNull CommonTransaction commonType ) { @Override - public void rollbackCommonTransaction( @NonNull CommonTransaction commonType ) { - assert commonType != CommonTransaction.NONE; - Transaction t = commonType == CommonTransaction.EXTRACT ? extractTransaction : loadTransaction; + public void rollbackCommonTransaction( @NonNull ActivityConfigModel.CommonType commonType ) { + assert commonType != CommonType.NONE; + Transaction t = commonType == CommonType.EXTRACT ? extractTransaction : loadTransaction; if ( t != null && t.isActive() ) { t.rollback( null ); } @@ -390,8 +398,8 @@ public void close() throws Exception { // In practice, calling close for closing transactions is not required, since the transactions should be closed manually assert extractTransaction == null || !extractTransaction.isActive() || extractTransaction.getNumberOfStatements() == 0 : "Common extract transaction should get explicitly committed or aborted"; assert loadTransaction == null || !loadTransaction.isActive() || loadTransaction.getNumberOfStatements() == 0 : "Common load transaction should get explicitly committed or aborted"; - rollbackCommonTransaction( CommonTransaction.EXTRACT ); - rollbackCommonTransaction( CommonTransaction.LOAD ); + rollbackCommonTransaction( CommonType.EXTRACT ); + rollbackCommonTransaction( CommonType.LOAD ); for ( Transaction t : localTransactions.values() ) { assert !t.isActive() : "local transactions should get explicitly committed or aborted"; t.rollback( null ); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/CheckpointReader.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/CheckpointReader.java index c84267e8fd..0cce3ceada 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/CheckpointReader.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/CheckpointReader.java @@ -175,7 +175,7 @@ public Iterator> getIteratorFromQuery( CheckpointQuery query, Li Statement statement = transaction.createStatement(); Pair parsed = QueryUtils.parseAndTranslateQuery( context, statement ); - if ( !QueryUtils.validateAlg( parsed.right, false, List.of( entity ) ) ) { + if ( !QueryUtils.validateAlg( parsed.right, false, entities ) ) { throw new GenericRuntimeException( "The specified query is not permitted: " + queryStr ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityConfigModel.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityConfigModel.java index 86ff4f43c7..c6831ea430 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityConfigModel.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityConfigModel.java @@ -30,7 +30,7 @@ public class ActivityConfigModel { String[] preferredStores; // one entry per output @JsonProperty(required = true) - CommonTransaction transactionMode; + CommonType commonType; ControlStateMerger controlStateMerger; @@ -52,11 +52,11 @@ public String getPreferredStore( int outputIdx ) { public static ActivityConfigModel of() { - return new ActivityConfigModel( false, null, CommonTransaction.NONE, ControlStateMerger.AND_AND ); + return new ActivityConfigModel( false, null, CommonType.NONE, ControlStateMerger.AND_AND ); } - public enum CommonTransaction { + public enum CommonType { // TODO: scheduler must take EXTRACT and LOAD "transactions" into account (these activities are executed first / last and succeed or fail atomically). LOAD can only start when EXTRACT has committed. // Any predecessor of a EXTRACT must also be a EXTRACT // Any successor of a LOAD must also be a LOAD diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityModel.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityModel.java index 4878743be2..76df157e15 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityModel.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityModel.java @@ -20,10 +20,13 @@ import com.fasterxml.jackson.databind.JsonNode; import java.util.Map; import java.util.UUID; +import lombok.AllArgsConstructor; import lombok.Value; +import org.polypheny.db.workflow.dag.activities.ActivityRegistry; import org.polypheny.db.workflow.dag.activities.ActivityWrapper.ActivityState; @Value +@AllArgsConstructor public class ActivityModel { String type; @@ -35,4 +38,24 @@ public class ActivityModel { @JsonInclude(JsonInclude.Include.NON_NULL) // do not serialize in static version ActivityState state; + + public ActivityModel( String type ) { + this( type, UUID.randomUUID() ); + } + + + public ActivityModel( String type, UUID id ) { + this( type, id, ActivityRegistry.getSerializableSettingValues( type ), ActivityConfigModel.of(), RenderModel.of(), null ); + } + + + public ActivityModel( String type, UUID id, Map settings, ActivityConfigModel config, RenderModel rendering ) { + this.type = type; + this.id = id; + this.settings = settings; + this.config = config; + this.rendering = rendering; + this.state = null; + } + } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/EdgeModel.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/EdgeModel.java index 13c2998c70..20707b60a1 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/EdgeModel.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/EdgeModel.java @@ -24,6 +24,7 @@ import lombok.Getter; import lombok.Value; import org.polypheny.db.util.Pair; +import org.polypheny.db.workflow.dag.edges.ControlEdge; import org.polypheny.db.workflow.dag.edges.Edge.EdgeState; @Value @@ -52,4 +53,19 @@ public Pair toPair() { return Pair.of( fromId, toId ); } + + public static EdgeModel of( ActivityModel from, ActivityModel to ) { + return new EdgeModel( from.getId(), to.getId(), 0, 0, false, null ); + } + + + public static EdgeModel of( ActivityModel from, ActivityModel to, int toPort ) { + return new EdgeModel( from.getId(), to.getId(), 0, toPort, false, null ); + } + + + public static EdgeModel of( ActivityModel from, ActivityModel to, boolean onSuccess ) { + return new EdgeModel( from.getId(), to.getId(), onSuccess ? ControlEdge.SUCCESS_PORT : ControlEdge.FAIL_PORT, 0, true, null ); + } + } diff --git a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/WorkflowUtils.java b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/WorkflowUtils.java new file mode 100644 index 0000000000..d33b71024c --- /dev/null +++ b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/WorkflowUtils.java @@ -0,0 +1,73 @@ +/* + * Copyright 2019-2024 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.workflow; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import org.polypheny.db.util.graph.TopologicalOrderIterator; +import org.polypheny.db.workflow.dag.Workflow; +import org.polypheny.db.workflow.dag.WorkflowImpl; +import org.polypheny.db.workflow.models.ActivityModel; +import org.polypheny.db.workflow.models.EdgeModel; +import org.polypheny.db.workflow.models.WorkflowConfigModel; +import org.polypheny.db.workflow.models.WorkflowModel; + +public class WorkflowUtils { + + public static Workflow getWorkflow( List activities, List edges ) { + WorkflowConfigModel config = WorkflowConfigModel.of(); + return WorkflowImpl.fromModel( new WorkflowModel( activities, edges, config, null ) ); + } + + + public static Workflow getWorkflow1() { + List activities = List.of( + new ActivityModel( "relValues" ), + new ActivityModel( "debug" ) + ); + List edges = List.of( + EdgeModel.of( activities.get( 0 ), activities.get( 1 ) ) + ); + return getWorkflow( activities, edges ); + } + + + public static Workflow getUnionWorkflow() { + List activities = List.of( + new ActivityModel( "relValues" ), + new ActivityModel( "relValues" ), + new ActivityModel( "relUnion" ) + ); + List edges = List.of( + EdgeModel.of( activities.get( 0 ), activities.get( 2 ), 0 ), + EdgeModel.of( activities.get( 1 ), activities.get( 2 ), 1 ), + EdgeModel.of( activities.get( 0 ), activities.get( 1 ), true ) // ensure consistent ordering + ); + return getWorkflow( activities, edges ); + } + + + public static List getTopologicalActivityIds( Workflow workflow ) { + List list = new ArrayList<>(); + for ( UUID n : TopologicalOrderIterator.of( workflow.toDag() ) ) { + list.add( n ); + } + return list; + } + +} diff --git a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java new file mode 100644 index 0000000000..ea445662f4 --- /dev/null +++ b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2019-2024 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.workflow.engine.scheduler; + +import java.sql.SQLException; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.polypheny.db.TestHelper; +import org.polypheny.db.workflow.WorkflowUtils; +import org.polypheny.db.workflow.dag.Workflow; +import org.polypheny.db.workflow.engine.storage.StorageManager; +import org.polypheny.db.workflow.engine.storage.StorageManagerImpl; +import org.polypheny.db.workflow.engine.storage.StorageUtils; + +class GlobalSchedulerTest { + + private static UUID sessionId; + private static StorageManager sm; + private static TestHelper testHelper; + private static GlobalScheduler scheduler; + + + @BeforeAll + public static void start() throws SQLException { + testHelper = TestHelper.getInstance(); + StorageUtils.addHsqldbLocksStore( "locks" ); + scheduler = GlobalScheduler.getInstance(); + } + + + @BeforeEach + public void init() { + sessionId = UUID.randomUUID(); + sm = new StorageManagerImpl( sessionId, StorageUtils.getDefaultStoreMap( "locks" ) ); + } + + + @AfterEach + public void cleanup() { + try { + sm.close(); + } catch ( Exception e ) { + throw new RuntimeException( e ); + } + } + + + @Test + void executeSimpleWorkflowTest() throws Exception { + Workflow workflow = WorkflowUtils.getWorkflow1(); + List ids = WorkflowUtils.getTopologicalActivityIds( workflow ); + scheduler.startExecution( workflow, sm, ids.get( 1 ) ); + scheduler.awaitResultProcessor( 5000 ); + + System.out.println( StorageUtils.readCheckpoint( sm, ids.get( 1 ), 0 ) ); + } + + + @Test + void executeUnionWorkflowTest() throws Exception { + Workflow workflow = WorkflowUtils.getUnionWorkflow(); + List ids = WorkflowUtils.getTopologicalActivityIds( workflow ); + scheduler.startExecution( workflow, sm, ids.get( 2 ) ); + scheduler.awaitResultProcessor( 5000 ); + + System.out.println( StorageUtils.readCheckpoint( sm, ids.get( 0 ), 0 ) ); + System.out.println( StorageUtils.readCheckpoint( sm, ids.get( 1 ), 0 ) ); + System.out.println( StorageUtils.readCheckpoint( sm, ids.get( 2 ), 0 ) ); + + } + +} diff --git a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/WorkflowSchedulerTest.java b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/WorkflowSchedulerTest.java new file mode 100644 index 0000000000..78427393fb --- /dev/null +++ b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/WorkflowSchedulerTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2019-2024 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.workflow.engine.scheduler; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.sql.SQLException; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.polypheny.db.TestHelper; +import org.polypheny.db.workflow.WorkflowUtils; +import org.polypheny.db.workflow.dag.Workflow; +import org.polypheny.db.workflow.dag.Workflow.WorkflowState; +import org.polypheny.db.workflow.dag.activities.ActivityWrapper.ActivityState; +import org.polypheny.db.workflow.engine.storage.StorageManager; +import org.polypheny.db.workflow.engine.storage.StorageManagerImpl; +import org.polypheny.db.workflow.engine.storage.StorageUtils; + +class WorkflowSchedulerTest { + + private static final UUID sessionId = UUID.randomUUID(); + private static StorageManager sm; + private static TestHelper testHelper; + + + @BeforeAll + public static void start() throws SQLException { + testHelper = TestHelper.getInstance(); + StorageUtils.addHsqldbLocksStore( "locks" ); + } + + + @BeforeEach + public void init() { + sm = new StorageManagerImpl( sessionId, StorageUtils.getDefaultStoreMap( "locks" ) ); + } + + + @AfterEach + public void cleanup() { + try { + sm.close(); + } catch ( Exception e ) { + throw new RuntimeException( e ); + } + } + + + @Test + void singleActivityTest() throws Exception { + final int globalWorkers = 1; + /*new EdgeModel( activities.get( 0 ).getId(), activities.get( 1 ).getId(), 0, 0, false, null ), + new EdgeModel( activities.get( 1 ).getId(), activities.get( 2 ).getId(), 0, 0, false, null ) + );*/ + + Workflow workflow = WorkflowUtils.getWorkflow1(); + List ids = WorkflowUtils.getTopologicalActivityIds( workflow ); + assertEquals( WorkflowState.IDLE, workflow.getState() ); + assertEquals( ActivityState.IDLE, workflow.getActivity( ids.get( 0 ) ).getState() ); + + WorkflowScheduler scheduler = new WorkflowScheduler( workflow, sm, globalWorkers, ids.get( 0 ) ); + assertEquals( WorkflowState.EXECUTING, workflow.getState() ); + assertEquals( ActivityState.QUEUED, workflow.getActivity( ids.get( 0 ) ).getState() ); + assertEquals( ActivityState.IDLE, workflow.getActivity( ids.get( 1 ) ).getState() ); + + List submissions = scheduler.startExecution(); + assertEquals( Math.min( ids.size(), globalWorkers ), submissions.size() ); + System.out.println( submissions ); + ExecutionSubmission submission = submissions.get( 0 ); + + assertEquals( ids.get( 0 ), submission.getRootId() ); + assertEquals( 1, submission.getActivities().size() ); + assertEquals( ActivityState.EXECUTING, workflow.getActivity( ids.get( 0 ) ).getState() ); + try { + submission.getExecutor().call(); + } catch ( Exception e ) { + throw e; + } + System.out.println( StorageUtils.readCheckpoint( sm, ids.get( 0 ), 0 ) ); + + scheduler.handleExecutionResult( new ExecutionResult( submission ) ); + + assertEquals( ActivityState.SAVED, workflow.getActivity( ids.get( 0 ) ).getState() ); + assertEquals( WorkflowState.IDLE, workflow.getState() ); + + } + +} diff --git a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageManagerTest.java b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageManagerTest.java index 9e12666fe5..884f24a24f 100644 --- a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageManagerTest.java +++ b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageManagerTest.java @@ -20,9 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.sql.Connection; import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -32,7 +30,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.polypheny.db.TestHelper; -import org.polypheny.db.TestHelper.JdbcConnection; import org.polypheny.db.adapter.AdapterManager; import org.polypheny.db.algebra.type.AlgDataType; import org.polypheny.db.algebra.type.AlgDataTypeFactory; @@ -51,15 +48,13 @@ class StorageManagerTest { private static final UUID sessionId = UUID.randomUUID(); private static TestHelper testHelper; - private static final String HSQLDB_LOCKS = "hsqldb_locks"; - private static final String HSQLDB_MVLOCKS = "hsqldb_mvlocks"; @BeforeAll public static void start() throws SQLException { testHelper = TestHelper.getInstance(); - addHsqldbStore( HSQLDB_LOCKS, "locks" ); - addHsqldbStore( HSQLDB_MVLOCKS, "mvlocks" ); + StorageUtils.addHsqldbStore( StorageUtils.HSQLDB_LOCKS, "locks" ); + StorageUtils.addHsqldbStore( StorageUtils.HSQLDB_MVLOCKS, "mvlocks" ); } @@ -188,28 +183,29 @@ void readQueryResultFromRelCheckpointTest() throws Exception { testHelper.checkAllTrxClosed(); } + @Test - @Timeout( value = 10, threadMode = Timeout.ThreadMode.SEPARATE_THREAD ) + @Timeout(value = 10, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) void createCheckpointWhileReadingTest() throws Exception { - for (String store : List.of(HSQLDB_LOCKS, HSQLDB_MVLOCKS)) { - try ( StorageManager sm = new StorageManagerImpl( sessionId, getDefaultStoreMap(store) ) ) { + for ( String store : List.of( StorageUtils.HSQLDB_LOCKS, StorageUtils.HSQLDB_MVLOCKS ) ) { + try ( StorageManager sm = new StorageManagerImpl( sessionId, StorageUtils.getDefaultStoreMap( store ) ) ) { UUID activityId1 = UUID.randomUUID(); UUID activityId2 = UUID.randomUUID(); AlgDataType type = getSampleType(); List> sampleData = getSampleData(); - System.out.println("Opening writer..."); + System.out.println( "Opening writer..." ); try ( RelWriter writer = sm.createRelCheckpoint( activityId1, 0, type, false, null ) ) { writer.write( sampleData.iterator() ); } - System.out.println("Wrote data to 1"); + System.out.println( "Wrote data to 1" ); - try (RelReader reader = (RelReader) sm.readCheckpoint( activityId1, 0 )) { + try ( RelReader reader = (RelReader) sm.readCheckpoint( activityId1, 0 ) ) { Iterator> it = reader.getIterator(); - System.out.println("Opened reader"); + System.out.println( "Opened reader" ); - try (RelWriter writer = sm.createRelCheckpoint( activityId2, 0, type, false, null )) { - System.out.println("Opened writer"); + try ( RelWriter writer = sm.createRelCheckpoint( activityId2, 0, type, false, null ) ) { + System.out.println( "Opened writer" ); while ( it.hasNext() ) { List tuple = it.next(); @@ -218,7 +214,7 @@ void createCheckpointWhileReadingTest() throws Exception { } } - System.out.println("Wrote data to 2"); + System.out.println( "Wrote data to 2" ); } testHelper.checkAllTrxClosed(); @@ -277,30 +273,4 @@ private void assertTupleEquals( List t1, List t2 ) { } } - private static void addHsqldbStore( String name, String trxControlMode ) throws SQLException { - TestHelper.executeSQL( "ALTER ADAPTERS ADD \"%s\" USING 'Hsqldb' AS 'Store'".formatted( name ) - + " WITH '{maxConnections:\"25\",trxControlMode:%s,trxIsolationLevel:read_committed,type:Memory,tableType:Memory,mode:embedded}'".formatted( trxControlMode ) - ); - } - - - private static void removeStore( String name ) throws SQLException { - try ( JdbcConnection polyphenyDbConnection = new JdbcConnection( true ) ) { - Connection connection = polyphenyDbConnection.getConnection(); - try ( Statement statement = connection.createStatement() ) { - - statement.executeUpdate( String.format( "ALTER ADAPTERS DROP \"%s\"", name ) ); - - } - } - } - - private static Map getDefaultStoreMap(String storeName) { - return Map.of( - DataModel.RELATIONAL, storeName, - DataModel.DOCUMENT, storeName, - DataModel.GRAPH, storeName - ); - } - } diff --git a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageUtils.java b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageUtils.java new file mode 100644 index 0000000000..2c5f9649fd --- /dev/null +++ b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageUtils.java @@ -0,0 +1,83 @@ +/* + * Copyright 2019-2024 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.workflow.engine.storage; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.polypheny.db.TestHelper; +import org.polypheny.db.TestHelper.JdbcConnection; +import org.polypheny.db.catalog.logistic.DataModel; +import org.polypheny.db.type.entity.PolyValue; +import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader; + +public class StorageUtils { + + public static final String HSQLDB_LOCKS = "hsqldb_locks"; + public static final String HSQLDB_MVLOCKS = "hsqldb_mvlocks"; + + + public static void addHsqldbStore( String name, String trxControlMode ) throws SQLException { + TestHelper.executeSQL( "ALTER ADAPTERS ADD \"%s\" USING 'Hsqldb' AS 'Store'".formatted( name ) + + " WITH '{maxConnections:\"25\",trxControlMode:%s,trxIsolationLevel:read_committed,type:Memory,tableType:Memory,mode:embedded}'".formatted( trxControlMode ) + ); + } + + + public static void addHsqldbLocksStore( String name ) throws SQLException { + addHsqldbStore( name, HSQLDB_LOCKS ); + } + + + public static void removeStore( String name ) throws SQLException { + try ( JdbcConnection polyphenyDbConnection = new JdbcConnection( true ) ) { + Connection connection = polyphenyDbConnection.getConnection(); + try ( Statement statement = connection.createStatement() ) { + + statement.executeUpdate( String.format( "ALTER ADAPTERS DROP \"%s\"", name ) ); + + } + } + } + + + public static Map getDefaultStoreMap( String storeName ) { + return Map.of( + DataModel.RELATIONAL, storeName, + DataModel.DOCUMENT, storeName, + DataModel.GRAPH, storeName + ); + } + + + public static List> readCheckpoint( StorageManager sm, UUID activityId, int index ) { + List> list = new ArrayList<>(); + try ( CheckpointReader reader = sm.readCheckpoint( activityId, index ) ) { + Iterator> it = reader.getIterator(); + while ( it.hasNext() ) { + list.add( it.next() ); + } + } + return list; + } + +}