Skip to content

Commit

Permalink
Improve workflow management
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Jan 14, 2025
1 parent adcf78c commit 391c678
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.polypheny.db.workflow.models.RenderModel;
import org.polypheny.db.workflow.models.WorkflowModel;
import org.polypheny.db.workflow.models.requests.CreateSessionRequest;
import org.polypheny.db.workflow.models.requests.RenameWorkflowRequest;
import org.polypheny.db.workflow.models.requests.SaveSessionRequest;
import org.polypheny.db.workflow.repo.WorkflowRepo;
import org.polypheny.db.workflow.repo.WorkflowRepo.WorkflowRepoException;
Expand Down Expand Up @@ -176,7 +177,7 @@ private void addSampleWorkflows() {
if ( repo.doesNameExist( fileName ) ) {
continue;
}
UUID wId = repo.createWorkflow( fileName );
UUID wId = repo.createWorkflow( fileName, "Sample Workflows" );
repo.writeVersion( wId, "Created Sample Workflow", workflow );
} catch ( IOException e ) {
throw new RuntimeException( e );
Expand Down Expand Up @@ -219,7 +220,11 @@ private void registerEndpoints() {
server.addSerializedRoute( PATH + "/sessions/{sessionId}/save", this::saveSession, HandlerType.POST );
server.addSerializedRoute( PATH + "/workflows/{workflowId}/{version}", this::openWorkflow, HandlerType.POST );

server.addSerializedRoute( PATH + "/workflows/{workflowId}", this::renameWorkflow, HandlerType.PATCH );

server.addSerializedRoute( PATH + "/sessions/{sessionId}", this::terminateSession, HandlerType.DELETE );
server.addSerializedRoute( PATH + "/workflows/{workflowId}", this::deleteWorkflow, HandlerType.DELETE );
server.addSerializedRoute( PATH + "/workflows/{workflowId}/{version}", this::deleteVersion, HandlerType.DELETE );
}


Expand Down Expand Up @@ -280,7 +285,7 @@ private void getActivityRegistry( final Context ctx ) {

private void createSession( final Context ctx ) {
CreateSessionRequest request = ctx.bodyAsClass( CreateSessionRequest.class );
process( ctx, () -> sessionManager.createUserSession( request.getName() ) );
process( ctx, () -> sessionManager.createUserSession( request.getName(), request.getGroup() ) );
}


Expand All @@ -292,6 +297,43 @@ private void openWorkflow( final Context ctx ) {
}


private void renameWorkflow( final Context ctx ) {
UUID workflowId = UUID.fromString( ctx.pathParam( "workflowId" ) );
RenameWorkflowRequest request = ctx.bodyAsClass( RenameWorkflowRequest.class );
process( ctx, () -> {
if ( request.getName() != null ) {
repo.renameWorkflow( workflowId, request.getName() );
}
if ( request.getGroup() != null ) {
repo.updateWorkflowGroup( workflowId, request.getGroup() );
}
return "success";
} );
}


private void deleteVersion( final Context ctx ) {
UUID workflowId = UUID.fromString( ctx.pathParam( "workflowId" ) );
int version = Integer.parseInt( ctx.pathParam( "version" ) );
process( ctx, () -> {
repo.deleteVersion( workflowId, version );
return "success";
} );
}


private void deleteWorkflow( final Context ctx ) {
UUID workflowId = UUID.fromString( ctx.pathParam( "workflowId" ) );
process( ctx, () -> {
if ( sessionManager.isWorkflowOpened( workflowId ) ) {
throw new WorkflowRepoException( "Cannot delete workflow while it is opened in a session", HttpCode.FORBIDDEN );
}
repo.deleteWorkflow( workflowId );
return "success";
} );
}


private void saveSession( final Context ctx ) {
UUID sessionId = UUID.fromString( ctx.pathParam( "sessionId" ) );
SaveSessionRequest request = ctx.bodyAsClass( SaveSessionRequest.class );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ public void awaitResultProcessor( long millis ) throws InterruptedException {
}


public boolean awaitExecutionFinish( UUID sessionId, int seconds ) throws InterruptedException {
WorkflowScheduler scheduler = schedulers.get( sessionId );
if ( scheduler == null || scheduler.isFinished() ) {
return true;
}
return scheduler.awaitFinish( seconds );
}


private void submit( List<ExecutionSubmission> submissions ) {
for ( ExecutionSubmission submission : submissions ) {
log.info( "Submitting {}", submission );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.NonNull;
Expand Down Expand Up @@ -67,6 +69,7 @@ public class WorkflowScheduler {
private final AttributedDirectedGraph<UUID, ExecutionEdge> execDag;
private final WorkflowOptimizer optimizer;
private final ExecutionMonitor executionMonitor;
private CountDownLatch finishLatch;

// TODO: define overall success or failure of workflow execution, e.g. with "mustSucceed" flag in activity
private boolean isAborted; // by interruption
Expand Down Expand Up @@ -104,6 +107,7 @@ public WorkflowScheduler( Workflow workflow, StorageManager sm, ExecutionMonitor
public List<ExecutionSubmission> startExecution() {
List<ExecutionSubmission> submissions = computeNextSubmissions();
executionMonitor.forwardStates();
finishLatch = new CountDownLatch( 1 );
return submissions;
}

Expand All @@ -130,6 +134,11 @@ public boolean isFinished() {
}


public boolean awaitFinish( int seconds ) throws InterruptedException {
return finishLatch == null || finishLatch.await( seconds, TimeUnit.SECONDS );
}


public boolean isCommonActive( @NonNull CommonType commonType ) {
return sm.isCommonActive( commonType );
}
Expand Down Expand Up @@ -437,6 +446,7 @@ private void setFinished() {
workflow.setState( WorkflowState.IDLE );
executionMonitor.stop();
isFinished = true;
finishLatch.countDown();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Value;
import org.polypheny.db.workflow.dag.Workflow.WorkflowState;

@Value
@AllArgsConstructor
public class SessionModel {

SessionModelType type;
UUID sessionId; // TODO: remove redundant session id, since it's already the map key? Or send list
UUID sessionId;
int connectionCount;

// USER_SESSION fields:
UUID workflowId;
Integer version;
WorkflowDefModel workflowDef;
WorkflowState state;


public SessionModel( SessionModelType type, UUID sId, int connectionCount ) {
Expand All @@ -41,6 +43,7 @@ public SessionModel( SessionModelType type, UUID sId, int connectionCount ) {
this.workflowId = null;
this.version = null;
this.workflowDef = null;
this.state = null;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ public class WorkflowDefModel {

@Setter
private String name;
@Setter
private String group; // null or '' for default group

private final Map<Integer, VersionInfo> versions;


public WorkflowDefModel( String name ) {
public WorkflowDefModel( String name, String group ) {
this.name = name;
this.group = group;
versions = new HashMap<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@
public class CreateSessionRequest {

String name;
String group;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2019-2025 The Polypheny Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.polypheny.db.workflow.models.requests;

import lombok.Value;

@Value
public class RenameWorkflowRequest {

String name;
String group;

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import javax.annotation.Nullable;
import lombok.Getter;
import org.polypheny.db.workflow.models.WorkflowDefModel;
import org.polypheny.db.workflow.models.WorkflowModel;
Expand All @@ -43,14 +44,27 @@ public interface WorkflowRepo {
WorkflowDefModel getWorkflowDef( UUID id ) throws WorkflowRepoException;

/**
* Creates a new workflow with the specified name.
* Creates a new workflow with the specified name and group.
*
* @param name the name of the new workflow to create.
* @param group the group of the new workflow or null to use the default group.
* @return the ID of the newly created workflow.
* @throws WorkflowRepoException if the workflow cannot be created, such as when a workflow with the
* same name already exists or if an error occurs during creation.
*/
UUID createWorkflow( String name ) throws WorkflowRepoException;
UUID createWorkflow( String name, @Nullable String group ) throws WorkflowRepoException;

/**
* Creates a new workflow with the specified name in the default group.
*
* @param name the name of the new workflow to create.
* @return the ID of the newly created workflow.
* @throws WorkflowRepoException if the workflow cannot be created, such as when a workflow with the
* same name already exists or if an error occurs during creation.
*/
default UUID createWorkflow( String name ) throws WorkflowRepoException {
return createWorkflow( name, null );
}

/**
* Reads a specific version of a workflow by ID.
Expand Down Expand Up @@ -102,6 +116,15 @@ public interface WorkflowRepo {
*/
void renameWorkflow( UUID id, String name ) throws WorkflowRepoException;

/**
* Changes the group of a workflow to the specified value.
*
* @param id the unique ID of the workflow.
* @param group the new group for the workflow.
* @throws WorkflowRepoException if the workflow cannot be modified, such as if an error occurs during the process.
*/
void updateWorkflowGroup( UUID id, String group ) throws WorkflowRepoException;

/**
* Checks if a workflow with the specified name already exists in the repository.
*
Expand Down Expand Up @@ -211,18 +234,18 @@ class WorkflowRepoException extends IOException {
private final HttpCode errorCode;


WorkflowRepoException( String message, Throwable cause, HttpCode errorCode ) {
public WorkflowRepoException( String message, Throwable cause, HttpCode errorCode ) {
super( message, cause );
this.errorCode = errorCode;
}


WorkflowRepoException( String message ) {
public WorkflowRepoException( String message ) {
this( message, null, HttpCode.INTERNAL_SERVER_ERROR );
}


WorkflowRepoException( String message, HttpCode errorCode ) {
public WorkflowRepoException( String message, HttpCode errorCode ) {
this( message, null, errorCode );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.annotation.Nullable;
import org.polypheny.db.util.PolyphenyHomeDirManager;
import org.polypheny.db.workflow.models.WorkflowDefModel;
import org.polypheny.db.workflow.models.WorkflowModel;
Expand Down Expand Up @@ -98,7 +99,7 @@ public WorkflowDefModel getWorkflowDef( UUID id ) throws WorkflowRepoException {


@Override
public UUID createWorkflow( String name ) throws WorkflowRepoException {
public UUID createWorkflow( String name, @Nullable String group ) throws WorkflowRepoException {
if ( doesNameExist( name ) ) {
throw new WorkflowRepoException( "Name already exists: " + name, HttpCode.CONFLICT );
}
Expand All @@ -113,7 +114,7 @@ public UUID createWorkflow( String name ) throws WorkflowRepoException {
} catch ( SecurityException e ) {
throw new WorkflowRepoException( "Insufficient permissions to create workflow directory: " + workflowDir.getAbsolutePath(), e );
}
serializeToFile( new File( workflowDir, DEF_FILE ), new WorkflowDefModel( name ) );
serializeToFile( new File( workflowDir, DEF_FILE ), new WorkflowDefModel( name, group ) );

return id;
}
Expand Down Expand Up @@ -162,20 +163,24 @@ public void deleteWorkflow( UUID id ) throws WorkflowRepoException {

@Override
public void deleteVersion( UUID id, int version ) throws WorkflowRepoException {
WorkflowDefModel def = getWorkflowDef( id );
if ( def.getVersions().size() <= 1 ) {
throw new WorkflowRepoException( "Cannot delete the only remaining version of workflow " + def.getName(), HttpCode.FORBIDDEN );
}

if ( !doesExist( id, version ) ) {
throw new WorkflowRepoException( "Unable to delete non-existent workflow version " + id + " v" + version, HttpCode.NOT_FOUND );
throw new WorkflowRepoException( "Unable to delete non-existent workflow version " + def.getName() + " v" + version, HttpCode.NOT_FOUND );
}

File dir = getWorkflowDir( id );
File versionFile = new File( dir, version + ".json" );
if ( !versionFile.exists() ) {
throw new WorkflowRepoException( "Version file " + versionFile.getName() + " not found for workflow " + id, HttpCode.NOT_FOUND );
throw new WorkflowRepoException( "Version file " + versionFile.getName() + " not found for workflow " + def.getName(), HttpCode.NOT_FOUND );
}
if ( !versionFile.delete() ) {
throw new WorkflowRepoException( "Failed to delete version file: " + versionFile.getAbsolutePath() );
}

WorkflowDefModel def = getWorkflowDef( id );
def.removeVersion( version );
serializeToFile( new File( dir, DEF_FILE ), def );
}
Expand All @@ -192,7 +197,17 @@ public void renameWorkflow( UUID id, String name ) throws WorkflowRepoException
}
def.setName( name );
serializeToFile( new File( getWorkflowDir( id ), DEF_FILE ), def ); // updated definition
}


@Override
public void updateWorkflowGroup( UUID id, String group ) throws WorkflowRepoException {
WorkflowDefModel def = getWorkflowDef( id );
if ( def.getGroup().equals( group ) ) {
return;
}
def.setGroup( group );
serializeToFile( new File( getWorkflowDir( id ), DEF_FILE ), def ); // updated definition
}


Expand Down
Loading

0 comments on commit 391c678

Please sign in to comment.