Skip to content

Commit

Permalink
Fixes bug when foreach iteration aborts affecting other iterations
Browse files Browse the repository at this point in the history
When a foreach iteration aborts, the whole task is marked as
aborted, but it should not affect the other iterations of the
same loop, which should start anyway, because the loop concept assumes
that all iterations run in parallel and could start at the
same time (before any of the iterations abort).

This bug was detected when the thread pool is small (e.g. size 1),
where iterations do not run in parallel, but it should behave as
they where launched at the same time with an infinite thread pool.
  • Loading branch information
lipido committed Sep 18, 2020
1 parent 85763f9 commit bf81e64
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>org.sing_group</groupId>
<artifactId>compi</artifactId>
<version>1.3.1</version>
<version>1.3.2</version>
<!--
WARNING: change version using (in the parent project):
mvn versions:set -DnewVersion=[new_version]
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.sing_group</groupId>
<artifactId>compi</artifactId>
<version>1.3.1</version>
<version>1.3.2</version>
<!--
WARNING: change version using (in the parent project):
mvn versions:set -DnewVersion=[new_version]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ private void initializeForEach(Foreach foreach) throws IllegalArgumentException
int index = 0;
for (final String source : values) {
ForeachIteration iteration = createIterationForForeach(foreach, source, index++);
this.tasksLeft.add(iteration);
this.forEachTasks.get(foreach).add(iteration);
}
this.dag.getDependantsOfTask(foreach).stream().forEach(dependency -> {
Expand Down Expand Up @@ -293,6 +292,10 @@ private void initializeForEach(Foreach foreach) throws IllegalArgumentException
});
}
});

for (ForeachIteration iteration : this.forEachTasks.get(foreach)) {
this.tasksLeft.add(iteration);
}
}
}

Expand Down
11 changes: 6 additions & 5 deletions core/src/main/java/org/sing_group/compi/core/TaskRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,13 @@ public void run() {
if (!this.task.isAborted()) {
taskAborted(this.task, ((ForeachIteration) task).getParentForeachTask().getAbortionCause());
}
} else {
this.process = this.getProcess(this.task);
openLogBuffers(this.process);
waitForProcess(this.process);
taskFinished(this.task);
}

this.process = this.getProcess(this.task);
openLogBuffers(this.process);
waitForProcess(this.process);
taskFinished(this.task);

} else {
taskFinished(this.task);
}
Expand Down
23 changes: 23 additions & 0 deletions core/src/test/java/org/sing_group/compi/tests/PipelineTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,25 @@ public void testPipelineIterationBindedLoopWithSingleTask() throws Exception {
}
}

@Test
public void testPipelineIterationBindedWithAbortedIterations() throws Exception {
final String pipelineFile =
ClassLoader.getSystemResource("testPipelineIterationBindedWithAbortedIterations.xml").getFile();

final CompiApp compi =
new CompiApp(
forPipeline(fromFile(new File(pipelineFile)), new File(pipelineFile)).whichRunsAMaximumOf(1)
.build()
);

TestExecutionHandler handler = new TestExecutionHandler();
compi.addTaskExecutionHandler(handler);

compi.run();
assertTrue(handler.getFinishedTasksIncludingLoopChildren().size() == 6);

}

@Test
public void testTaskExecutionHandler() throws Exception {
final String pipelineFile = ClassLoader.getSystemResource("testExecutionHandler.xml").getFile();
Expand Down Expand Up @@ -492,6 +511,10 @@ public void testTaskExecutionHandler() throws Exception {
handler.taskIterationAborted(capture(capturesForTaskID3[1]), anyObject());
expectLastCall();

// ID3: iteration 2, runs equally
handler.taskIterationFinished(capture(capturesForTaskID3[3]));
expectLastCall();

// the whole task ID3 aborts...
handler.taskAborted(eq(tasksById.get("ID3")), anyObject());
expectLastCall();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
#%L
Compi Core
%%
Copyright (C) 2016 - 2018 Daniel Glez-Peña, Osvaldo Graña-Castro, Hugo
López-Fernández, Jesús Álvarez Casanova
%%
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
#L%
-->

<pipeline xmlns="http://www.sing-group.org/compi/pipeline-1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<version>1.0.0</version>
<tasks>
<foreach id="ID-1" of="list" in="-j,-l,-l" as="param">
ls ${param} /tmp
</foreach>

<foreach id="ID-2" after="*ID-1" of="list" in="-l,-l,-l" as="param">
ls ${param} /tmp
</foreach>

<foreach id="ID-3" after="*ID-2" of="list" in="-l,-l,-l" as="param">
ls ${param} /tmp
</foreach>
</tasks>
</pipeline>
2 changes: 1 addition & 1 deletion dk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>org.sing_group</groupId>
<artifactId>compi</artifactId>
<version>1.3.1</version>
<version>1.3.2</version>
<!--
WARNING: change version using (in the parent project):
mvn versions:set -DnewVersion=[new_version]
Expand Down
2 changes: 1 addition & 1 deletion e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>org.sing_group</groupId>
<artifactId>compi</artifactId>
<version>1.3.1</version>
<version>1.3.2</version>
<!--
WARNING: change version using (in the parent project):
mvn versions:set -DnewVersion=[new_version]
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<groupId>org.sing_group</groupId>
<artifactId>compi</artifactId>
<packaging>pom</packaging>
<version>1.3.1</version>
<version>1.3.2</version>
<!-- WARNING: change version using (in the parent project): mvn versions:set -DnewVersion=[new_version] mvn versions:commit This will change the version
in all modules at-once -->

Expand Down

0 comments on commit bf81e64

Please sign in to comment.