Skip to content

Commit

Permalink
Fix more scheduling bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Jan 3, 2025
1 parent 260089c commit 1388019
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,7 @@ private void openWorkflow( final Context ctx ) {
private void saveSession( final Context ctx ) {
UUID sessionId = UUID.fromString( ctx.pathParam( "sessionId" ) );
SaveSessionRequest request = ctx.bodyAsClass( SaveSessionRequest.class );
process( ctx, () -> {
sessionManager.saveUserSession( sessionId, request.getMessage() );
return "success";
} );
process( ctx, () -> sessionManager.saveUserSession( sessionId, request.getMessage() ) );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ public void validateStructure( StorageManager sm, AttributedDirectedGraph<UUID,
}

}
if ( !requiredInPorts.isEmpty() ) {
if ( !requiredInPorts.isEmpty() && wrapper.getState() != ActivityState.SAVED) { // already saved activities do not need their predecessors in the subDag
throw new IllegalStateException( "Activity is missing the required data input(s) " + requiredInPorts + ": " + wrapper );
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ public synchronized ExecutionMonitor startExecution( Workflow workflow, StorageM
interruptedSessions.remove( sessionId );
ExecutionMonitor monitor = new ExecutionMonitor( workflow, targetActivity, monitoringCallback );
WorkflowScheduler scheduler;
List<ExecutionSubmission> submissions;
try {
scheduler = new WorkflowScheduler( workflow, sm, monitor, GLOBAL_WORKERS, targetActivity );
submissions = scheduler.startExecution();
} catch ( Exception e ) {
monitor.stop();
workflow.setState( WorkflowState.IDLE );
monitor.stop();
throw e;
}
List<ExecutionSubmission> submissions = scheduler.startExecution();
if ( submissions.isEmpty() ) {
throw new GenericRuntimeException( "At least one activity needs to be executable when submitting a workflow for execution" );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,14 @@ public List<ExecutionSubmission> handleExecutionResult( ExecutionResult result )
return null;
}

return computeNextSubmissions();
try {
return computeNextSubmissions();
} catch ( Exception e ) {
// this should never happen, but as a fallback we finish workflow execution
log.error( "An unexpected error occurred while determining the next activities to be submitted", e );
setFinished();
return null;
}
}


Expand All @@ -270,6 +277,9 @@ private void updateTransactions( ExecutionResult result ) throws TransactionExce

private List<ExecutionSubmission> computeNextSubmissions() {
List<SubmissionFactory> factories = optimizer.computeNextTrees( maxWorkers - pendingCount, activePartition.commonType );
if (pendingCount == 0 && factories.isEmpty()) {
throw new IllegalStateException("The optimizer is unable to determine the next activity to be executed");
}
pendingCount += factories.size();
List<ExecutionSubmission> submissions = factories.stream().map( f -> f.create( sm, workflow ) ).toList();
for ( ExecutionSubmission submission : submissions ) {
Expand Down Expand Up @@ -362,7 +372,9 @@ private void propagateResult( boolean isActive, Edge edge, AttributedDirectedGra
case INACTIVE -> {
target.setState( ActivityState.SKIPPED );
remainingActivities.remove( target.getId() );
targetPartition.setResolved( target.getId(), false ); // no need to catch the exception, as the transaction is already rolled back
if ( targetPartition != null) { // in case of initial propagation for saved activities, there is no targetPartition yet
targetPartition.setResolved( target.getId(), false ); // no need to catch the exception, as the transaction is already rolled back
}
// a skipped activity does NOT count as failed -> onFail control edges also become INACTIVE
dag.getOutwardEdges( target.getId() ).forEach( e -> propagateResult( false, workflow.getEdge( e ), dag, ignorePartitions ) ); // TODO: out edges from workflow or DAG?
}
Expand All @@ -388,6 +400,9 @@ private void updatePartitions() {


private void setFinished() {
if (!remainingActivities.isEmpty()) {
setStates( remainingActivities, ActivityState.SKIPPED );
}
workflow.setState( WorkflowState.IDLE );
executionMonitor.stop();
isFinished = true;
Expand Down Expand Up @@ -418,7 +433,7 @@ private Partition( CommonType commonType, AttributedDirectedGraph<UUID, Executio
this.commonType = commonType;
this.isAtomic = commonType != CommonType.NONE;
this.activities = dag.vertexSet().stream().filter( n -> workflow.getActivity( n ).getConfig().getCommonType() == commonType ).collect( Collectors.toSet() );
this.remaining.addAll( activities );
this.remaining.addAll( activities.stream().filter( n -> !workflow.getActivity( n ).getState().isExecuted() ).toList() );
this.isFinished = activities.isEmpty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class SessionModel {
// USER_SESSION fields:
UUID workflowId;
Integer version;
WorkflowDefModel workflowDef;


public SessionModel( SessionModelType type, UUID sId, int connectionCount ) {
Expand All @@ -39,7 +40,7 @@ public SessionModel( SessionModelType type, UUID sId, int connectionCount ) {
this.connectionCount = connectionCount;
this.workflowId = null;
this.version = null;

this.workflowDef = null;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,11 @@ protected AbstractSession( Workflow workflow, UUID sessionId ) {
* @param session the UI websocket session to be registered
*/
public void subscribe( Session session ) {
System.out.println( "subscribed" );
subscribers.add( session );
}


public void unsubscribe( Session session ) {
System.out.println( "unsubscribed" );
subscribers.remove( session );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ public void terminateSession( UUID sId ) {
}


public void saveUserSession( UUID sId, String versionDesc ) throws WorkflowRepoException {
public int saveUserSession( UUID sId, String versionDesc ) throws WorkflowRepoException {
UserSession session = getUserSessionOrThrow( sId );
int version = repo.writeVersion( session.getWId(), versionDesc, session.getWorkflowModel( false ) );
session.setOpenedVersion( version );
return version;
}


Expand Down Expand Up @@ -133,9 +134,9 @@ private boolean removeSession( UUID sId ) {
}


private UUID registerUserSession( Workflow wf, UUID wId, int version ) {
private UUID registerUserSession( Workflow wf, UUID wId, int version ) throws WorkflowRepoException {
UUID sId = UUID.randomUUID();
UserSession session = new UserSession( sId, wf, wId, version );
UserSession session = new UserSession( sId, wf, wId, version, repo.getWorkflowDef( wId ) );
userSessions.put( sId, session );
return sId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.polypheny.db.workflow.dag.activities.ActivityWrapper;
import org.polypheny.db.workflow.models.SessionModel;
import org.polypheny.db.workflow.models.SessionModel.SessionModelType;
import org.polypheny.db.workflow.models.WorkflowDefModel;
import org.polypheny.db.workflow.models.requests.WsRequest.CreateActivityRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.CreateEdgeRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.DeleteActivityRequest;
Expand All @@ -41,19 +42,20 @@
import org.polypheny.db.workflow.models.responses.WsResponse.RenderingUpdateResponse;
import org.polypheny.db.workflow.models.responses.WsResponse.StateUpdateResponse;

@Getter
public class UserSession extends AbstractSession {

@Getter
private final UUID wId;
@Getter
@Setter
private int openedVersion;
private final WorkflowDefModel workflowDef;


public UserSession( UUID sessionId, Workflow wf, UUID workflowId, int openedVersion ) {
public UserSession( UUID sessionId, Workflow wf, UUID workflowId, int openedVersion, WorkflowDefModel workflowDef ) {
super( wf, sessionId );
this.wId = workflowId;
this.openedVersion = openedVersion;
this.workflowDef = workflowDef;
}


Expand Down Expand Up @@ -107,6 +109,8 @@ public void handleRequest( CreateEdgeRequest request ) {
@Override
public void handleRequest( DeleteEdgeRequest request ) {
throwIfNotEditable();
workflow.deleteEdge( request.edge, sm );
broadcastMessage( new StateUpdateResponse( request.msgId, workflow ) );
}


Expand Down Expand Up @@ -145,7 +149,7 @@ public void handleRequest( InterruptRequest request ) {

@Override
public SessionModel toModel() {
return new SessionModel( SessionModelType.USER_SESSION, sessionId, getSubscriberCount(), wId, openedVersion );
return new SessionModel( SessionModelType.USER_SESSION, sessionId, getSubscriberCount(), wId, openedVersion, workflowDef);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public void onMessage( final WsMessageContext ctx ) {


public void closed( WsCloseContext ctx ) {
System.out.println( "closed websocket: " + ctx.reason() );
sessions.get( ctx.session ).unsubscribe( ctx.session );
sessions.remove( ctx.session );
}
Expand Down

0 comments on commit 1388019

Please sign in to comment.