Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JBPM-10242] Allow the possibility of disabling linear search for removeJob and getTimerByName operations #2441

Merged
merged 5 commits into from
Oct 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
Expand Down Expand Up @@ -114,6 +118,7 @@ public void executeTimerJob(Timer timer) {
try {
executeTimerJobInstance(timerJobInstance);
} catch (Exception e) {
logger.error("Error executing timer handle {}", timerJobInstance.getJobHandle(), e);
recoverTimerJobInstance(timerJob, timer, e);
}
}
Expand All @@ -139,7 +144,7 @@ else if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != nul
// because of the transaction, so we need to do this here.
tx = timerJobInstance -> {
logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", timerJobInstance);
if (removeJob(timerJobInstance.getJobHandle(), null)) {
if (removeJob(timerJobInstance.getJobHandle(), timer)) {
internalSchedule(timerJobInstance);
} else {
logger.debug("Interval trigger {} was removed before rescheduling", timerJobInstance);
Expand Down Expand Up @@ -246,107 +251,98 @@ private Serializable removeTransientFields(Serializable info) {
return info;
}

public boolean removeJob(JobHandle jobHandle, Timer ejbTimer) {
private boolean disableLinearSearch(String suffix) {
return Boolean.getBoolean("org.jbpm.ejb.timer.disable.linear." + suffix);
}


public boolean removeJob(JobHandle jobHandle, Timer ejbTimer) {
EjbGlobalJobHandle ejbHandle = (EjbGlobalJobHandle) jobHandle;
if (useLocalCache) {
boolean removedFromCache = localCache.remove(ejbHandle.getUuid()) != null;
logger.debug("Job handle {} is {} removed from cache ", jobHandle, removedFromCache ? "" : "not" );
logger.debug("Job handle {} is {} removed from cache ", jobHandle, removedFromCache ? "" : "not");
}

if (ejbTimer != null) {
try {
ejbTimer.cancel();
return true;
} catch (Throwable e) {
logger.debug("Timer cancel error due to {}", e.getMessage());
return false;
}
return cancelTimer(ejbTimer, ejbHandle);
}

// small speed improvement using the ejb serializable info handler
GlobalJpaTimerJobInstance timerJobInstance = (GlobalJpaTimerJobInstance) ejbHandle.getTimerJobInstance();
if (timerJobInstance != null) {
Object ejbTimerHandle = timerJobInstance.getTimerInfo();
if(ejbTimerHandle instanceof TimerHandle) {
Object ejbTimerHandle = timerJobInstance.getTimerInfo();
if (ejbTimerHandle instanceof TimerHandle) {
try {
((TimerHandle) ejbTimerHandle).getTimer().cancel();
} catch (Throwable e) {
logger.debug("Timer cancel error due to {}", e.getMessage());
return true;
} catch (Exception ex) {
logger.warn("Cancelling timer failed for handle {}", ejbHandle, ex);
return false;
}
return true;
}
} else {
logger.warn("No timerJobInstance available for {}", ejbHandle);
}
logger.debug("No valid TimerJob instance {} available for Job handle {}", timerJobInstance, ejbHandle);
return linearSearch("remove", ejbHandle.getUuid(),
(timer, job) -> cancelTimer(timer, (EjbGlobalJobHandle) job.getJobHandle())).orElse(false);
}

for (Timer timer : timerService.getTimers()) {
try {
Serializable info = timer.getInfo();
if (info instanceof EjbTimerJob) {
EjbTimerJob job = (EjbTimerJob) info;

EjbGlobalJobHandle handle = (EjbGlobalJobHandle) job.getTimerJobInstance().getJobHandle();
if (handle.getUuid().equals(ejbHandle.getUuid())) {
logger.debug("Job handle {} does match timer and is going to be canceled", jobHandle);

try {
timer.cancel();
} catch (Throwable e) {
logger.debug("Timer cancel error due to {}", e.getMessage());
return false;
}
return true;
}
}
} catch (NoSuchObjectLocalException e) {
logger.debug("Timer {} has already expired or was canceled ", timer);
}
}
logger.debug("Job handle {} does not match any timer on {} scheduler service", jobHandle, this);
return false;
}
public TimerJobInstance getTimerByName(String jobName) {
if (useLocalCache) {
TimerJobInstance found = localCache.get(jobName);
if (found != null) {
logger.debug("Found timer job instance with name {} in cache, returning {}", jobName, found);
return found;
}
logger.debug("Timer Job Instance with name {} not found in cache", jobName);
}
return linearSearch("search", jobName, (timer, job) -> {
if (useLocalCache && job != null) {
localCache.putIfAbsent(jobName, job);
}
return job;
}).orElse(null);
}


private boolean cancelTimer(Timer timer, EjbGlobalJobHandle ejbHandle) {
try {
timer.cancel();
return true;
} catch (Exception ex) {
logger.warn("Cancelling timer failed for handle {}", ejbHandle, ex);
return false;
}
}

public TimerJobInstance getTimerByName(String jobName) {
if (useLocalCache) {
if (localCache.containsKey(jobName)) {
logger.debug("Found job {} in cache returning", jobName);
return localCache.get(jobName);
}
}
TimerJobInstance found = null;

for (Timer timer : timerService.getTimers()) {
try {
Serializable info = timer.getInfo();
if (info instanceof EjbTimerJob) {
EjbTimerJob job = (EjbTimerJob) info;

EjbGlobalJobHandle handle = (EjbGlobalJobHandle) job.getTimerJobInstance().getJobHandle();

if (handle.getUuid().equals(jobName)) {
found = handle.getTimerJobInstance();
if (useLocalCache) {
localCache.putIfAbsent(jobName, found);
}
logger.debug("Job {} does match timer and is going to be returned {}", jobName, found);

break;
}
}
} catch (NoSuchObjectLocalException e) {
logger.debug("Timer info for {} was not found ", timer);
private <T> Optional<T> linearSearch(String suffix, String uuid, BiFunction<Timer, TimerJobInstance, T> function) {
if (disableLinearSearch(suffix)) {
logger.warn("Skipping linear search to {} UUID {}", suffix, uuid);
} else {
logger.info("Searching UUID {} on {} scheduler service", uuid, this);
for (Timer timer : timerService.getTimers()) {
try {
Serializable info = timer.getInfo();
if (info instanceof EjbTimerJob) {
EjbTimerJob job = (EjbTimerJob) info;
EjbGlobalJobHandle handle = (EjbGlobalJobHandle) job.getTimerJobInstance().getJobHandle();
if (handle.getUuid().equals(uuid)) {
logger.debug("UIID {} does match timer {} and handle {}", uuid, timer,
job.getTimerJobInstance());
return Optional.ofNullable(function.apply(timer, job.getTimerJobInstance()));
}
}
} catch (NoSuchObjectLocalException e) {
logger.info("Info for timer {} is not there ", timer, e);
}
}
}

return found;
}
logger.info("UUID {} does not match any timer on {} scheduler service", uuid, this);
}
return Optional.empty();
}

public void evictCache(JobHandle jobHandle) {
String jobName = ((EjbGlobalJobHandle) jobHandle).getUuid();
logger.debug("Invalidate job {} with job name {} in cache", jobName, localCache.remove(jobName));
logger.debug("Invalidate job {} with job name {} in cache", localCache.remove(jobName), jobName);
}

}
Loading