Skip to content

Commit

Permalink
Add hook point for job observation
Browse files Browse the repository at this point in the history
  • Loading branch information
filiphr committed Oct 7, 2024
1 parent b8fe368 commit b3e36c9
Show file tree
Hide file tree
Showing 7 changed files with 405 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.flowable.engine.test.jobexecutor;

import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;

import org.assertj.core.api.InstanceOfAssertFactories;
import org.flowable.common.engine.impl.scripting.FlowableScriptEvaluationException;
import org.flowable.engine.impl.test.JobTestHelper;
import org.flowable.engine.impl.test.PluggableFlowableTestCase;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.engine.test.Deployment;
import org.flowable.job.api.JobInfo;
import org.flowable.job.api.TimerJobQuery;
import org.flowable.job.service.impl.asyncexecutor.JobExecutionObservation;
import org.flowable.job.service.impl.asyncexecutor.JobExecutionObservationProvider;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
* @author Filip Hrisafov
*/
class JobExecutionObservationTest extends PluggableFlowableTestCase {

protected TestJobExecutionObservationProvider testObservationProvider = new TestJobExecutionObservationProvider();

@BeforeEach
void setUp() {
processEngineConfiguration.getJobServiceConfiguration().setJobExecutionObservationProvider(testObservationProvider);
}

@Test
@Deployment(resources = { "org/flowable/engine/test/jobexecutor/AsyncExecutorTest.testRegularAsyncExecution.bpmn20.xml" })
public void regularTimerExecution() {
runtimeService.startProcessInstanceByKey("asyncExecutor");
assertThat(managementService.createTimerJobQuery().count()).isOne();

assertThat(testObservationProvider.createdObservations).isEmpty();

// Timer is set for 5 minutes, so move clock 10 minutes
processEngineConfiguration.getClock().setCurrentTime(Date.from(Instant.now().plus(10, ChronoUnit.MINUTES)));

waitForJobExecutorToProcessAllJobsAndExecutableTimerJobs(10000L, 500L);

assertThat(testObservationProvider.createdObservations)
.singleElement()
.satisfies(observation -> {
assertThat(observation.jobInfo).isNotNull();
assertThat(observation.started).isEqualTo(1);
assertThat(observation.stopped).isEqualTo(1);
assertThat(observation.lockErrors).isEmpty();
assertThat(observation.executionErrors).isEmpty();
assertThat(observation.scopes)
.extracting(scope -> scope.name)
.containsExactly("lock", "execution");
assertThat(observation.scopes)
.allSatisfy(scope -> assertThat(scope.closed).isTrue());
});
}

@Test
@Deployment(resources = { "org/flowable/engine/test/jobexecutor/asyncNonExclusiveServiceTask.bpmn20.xml" })
public void regularNonExclusive() {
runtimeService.startProcessInstanceByKey("asyncTask");
assertThat(managementService.createJobQuery().count()).isOne();

assertThat(testObservationProvider.createdObservations).isEmpty();

waitForJobExecutorToProcessAllJobsAndExecutableTimerJobs(10000L, 500L);

assertThat(testObservationProvider.createdObservations)
.singleElement()
.satisfies(observation -> {
assertThat(observation.jobInfo).isNotNull();
assertThat(observation.started).isEqualTo(1);
assertThat(observation.stopped).isEqualTo(1);
assertThat(observation.lockErrors).isEmpty();
assertThat(observation.executionErrors).isEmpty();
assertThat(observation.scopes)
.extracting(scope -> scope.name)
.containsExactly("execution");
assertThat(observation.scopes)
.allSatisfy(scope -> assertThat(scope.closed).isTrue());
});
}

@Test
@Deployment(resources = { "org/flowable/engine/test/api/mgmt/ManagementServiceTest.testGetJobExceptionStacktrace.bpmn20.xml" })
public void withExecutionException() {
TimerJobQuery query = managementService.createTimerJobQuery().withException();
assertThat(query.count()).isZero();

ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("exceptionInJobExecution");

// Timer is set for 4 hours, so move clock 5 hours
processEngineConfiguration.getClock().setCurrentTime(new Date(new Date().getTime() + 5 * 60 * 60 * 1000));

// The execution is waiting in the first usertask. This contains a
// boundary timer event which we will execute manual for testing purposes.
JobTestHelper.waitForJobExecutorOnCondition(processEngineConfiguration, 7000L, 100L, new Callable<>() {
@Override
public Boolean call() throws Exception {
return managementService.createTimerJobQuery().withException().count() == 1;
}
});

query = managementService.createTimerJobQuery().processInstanceId(processInstance.getId()).withException();
assertThat(query.count()).isEqualTo(1);

assertThat(testObservationProvider.createdObservations)
.singleElement()
.satisfies(observation -> {
assertThat(observation.jobInfo).isNotNull();
assertThat(observation.started).isEqualTo(1);
assertThat(observation.stopped).isEqualTo(1);
assertThat(observation.lockErrors).isEmpty();
assertThat(observation.executionErrors)
.singleElement(as(InstanceOfAssertFactories.throwable(Throwable.class)))
.isInstanceOf(FlowableScriptEvaluationException.class);
assertThat(observation.scopes)
.extracting(scope -> scope.name)
.containsExactly("lock", "execution");
assertThat(observation.scopes)
.allSatisfy(scope -> assertThat(scope.closed).isTrue());
});

}

static class TestJobExecutionObservationProvider implements JobExecutionObservationProvider {

protected final List<TestJobExecutionObservation> createdObservations = new ArrayList<>();

@Override
public JobExecutionObservation create(JobInfo job) {
TestJobExecutionObservation observation = new TestJobExecutionObservation(job);
createdObservations.add(observation);
return observation;
}
}

static class TestJobExecutionObservation implements JobExecutionObservation {

protected final JobInfo jobInfo;
protected int started = 0;
protected int stopped = 0;
protected final List<JobExecutionObservationScope> scopes = new ArrayList<>();
protected final List<Throwable> lockErrors = new ArrayList<>();
protected final List<Throwable> executionErrors = new ArrayList<>();

TestJobExecutionObservation(JobInfo jobInfo) {
this.jobInfo = jobInfo;
}

@Override
public void start() {
started++;
}

@Override
public void stop() {
stopped++;
}

@Override
public Scope lockScope() {
JobExecutionObservationScope scope = new JobExecutionObservationScope("lock");
scopes.add(scope);
return scope;
}

@Override
public void lockError(Throwable lockException) {
lockErrors.add(lockException);
}

@Override
public Scope executionScope() {
JobExecutionObservationScope scope = new JobExecutionObservationScope("execution");
scopes.add(scope);
return scope;
}

@Override
public void executionError(Throwable exception) {
executionErrors.add(exception);
}
}

static class JobExecutionObservationScope implements JobExecutionObservation.Scope {

protected final String name;
protected boolean closed;

JobExecutionObservationScope(String name) {
this.name = name;
}

@Override
public void close() {
closed = true;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions id="definitions"
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:flowable="http://flowable.org/bpmn"
targetNamespace="Examples">

<process id="asyncTask">
<startEvent id="theStart" />
<sequenceFlow id="flow1" sourceRef="theStart" targetRef="serviceTask" />

<serviceTask id="serviceTask" name="Simple task" flowable:async="true" flowable:exclusive="false" flowable:expression="${true}" />

<sequenceFlow id="flow2" sourceRef="serviceTask" targetRef="theEnd" />
<endEvent id="theEnd" />
</process>

</definitions>
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.flowable.job.service.impl.asyncexecutor.AsyncRunnableExecutionExceptionHandler;
import org.flowable.job.service.impl.asyncexecutor.DefaultJobManager;
import org.flowable.job.service.impl.asyncexecutor.FailedJobCommandFactory;
import org.flowable.job.service.impl.asyncexecutor.JobExecutionObservationProvider;
import org.flowable.job.service.impl.asyncexecutor.JobManager;
import org.flowable.job.service.impl.asyncexecutor.TimerJobScheduler;
import org.flowable.job.service.impl.asyncexecutor.TimerJobSchedulerImpl;
Expand Down Expand Up @@ -127,6 +128,8 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration<JobSer
protected Map<String, HistoryJobHandler> historyJobHandlers;
protected List<HistoryJobProcessor> historyJobProcessors;

protected JobExecutionObservationProvider jobExecutionObservationProvider = JobExecutionObservationProvider.NOOP;

public JobServiceConfiguration(String engineName) {
super(engineName);
}
Expand Down Expand Up @@ -579,4 +582,11 @@ public void addEnabledJobCategory(String jobCategory) {
enabledJobCategories.add(jobCategory);
}

public JobExecutionObservationProvider getJobExecutionObservationProvider() {
return jobExecutionObservationProvider;
}

public void setJobExecutionObservationProvider(JobExecutionObservationProvider jobExecutionObservationProvider) {
this.jobExecutionObservationProvider = jobExecutionObservationProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,18 @@ private List<AsyncRunnableExecutionExceptionHandler> initializeExceptionHandlers
@Override
public void run() {
TenantContext tenantContext = CurrentTenant.getTenantContext();
JobExecutionObservation observation = jobServiceConfiguration.getJobExecutionObservationProvider().create(job);
try {
tenantContext.setTenantId(job.getTenantId());
runInternally();
observation.start();
runInternally(observation);
} finally {
observation.stop();
tenantContext.clearTenantId();
}
}

protected void runInternally() {
protected void runInternally(JobExecutionObservation observation) {

if (job instanceof Job) {
Job jobObject = (Job) job;
Expand All @@ -101,21 +104,21 @@ protected void runInternally() {
boolean lockingNeeded = ((AbstractRuntimeJobEntity) job).isExclusive();
boolean executeJob = true;
if (lockingNeeded) {
executeJob = lockJob();
executeJob = lockJob(observation);
}
if (executeJob) {
executeJob(lockingNeeded);
executeJob(lockingNeeded, observation);
}

} else { // history jobs
executeJob(false); // no locking for history jobs needed
executeJob(false, observation); // no locking for history jobs needed

}

}

protected void executeJob(final boolean unlock) {
try {
protected void executeJob(final boolean unlock, JobExecutionObservation observation) {
try (JobExecutionObservation.Scope ignored = observation.executionScope()) {
jobServiceConfiguration.getCommandExecutor().execute(
new ExecuteAsyncRunnableJobCmd(job.getId(), jobEntityManager, jobServiceConfiguration, unlock));

Expand All @@ -134,8 +137,14 @@ protected void executeJob(final boolean unlock) {
+ "Exception message: {}", e.getMessage());
}

observation.executionError(e);

} catch (Throwable exception) {
handleFailedJob(exception);
try {
handleFailedJob(exception);
} finally {
observation.executionError(exception);
}
}
}

Expand Down Expand Up @@ -164,9 +173,9 @@ protected void unlockJobIfNeeded() {
}
}

protected boolean lockJob() {
protected boolean lockJob(JobExecutionObservation observation) {
Job job = (Job) this.job; // This method is only called for a regular Job
try {
try (JobExecutionObservation.Scope ignored = observation.lockScope()) {
jobServiceConfiguration.getCommandExecutor().execute(new LockExclusiveJobCmd(job, jobServiceConfiguration));

} catch (Throwable lockException) {
Expand All @@ -177,6 +186,8 @@ protected boolean lockJob() {
// Release the job again so it can be acquired later or by another node
unacquireJob();

observation.lockError(lockException);

return false;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.flowable.job.service.impl.asyncexecutor;

/**
* @author Filip Hrisafov
*/
public interface JobExecutionObservation {

JobExecutionObservation NOOP = new NoopJobExecutionObservationProvider.NoopJobExecutionObservation();

void start();

void stop();

Scope lockScope();

void lockError(Throwable lockException);

Scope executionScope();

void executionError(Throwable exception);

interface Scope extends AutoCloseable {

@Override
void close();
}
}
Loading

0 comments on commit b3e36c9

Please sign in to comment.