Skip to content

Commit

Permalink
Client update for more decentralized functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
GedMarc committed Feb 28, 2024
1 parent 6ddcfac commit c2d44ce
Show file tree
Hide file tree
Showing 17 changed files with 340 additions and 782 deletions.
524 changes: 227 additions & 297 deletions src/main/java/com/guicedee/guicedinjection/GuiceContext.java

Large diffs are not rendered by default.

87 changes: 56 additions & 31 deletions src/main/java/com/guicedee/guicedinjection/JobService.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.guicedee.guicedinjection;

import com.google.inject.Singleton;
import com.guicedee.guicedinjection.interfaces.IGuicePreDestroy;
import com.guicedee.guicedinjection.interfaces.*;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.java.Log;
Expand All @@ -18,8 +18,7 @@

@Singleton
@Log
public class JobService
implements IGuicePreDestroy<JobService>
public class JobService implements IGuicePreDestroy<JobService>, IJobService
{
private final Map<String, ExecutorService> serviceMap = new ConcurrentHashMap<>();
private final Map<String, ScheduledExecutorService> pollingMap = new ConcurrentHashMap<>();
Expand All @@ -34,17 +33,9 @@ public class JobService
@Setter
private static TimeUnit defaultWaitUnit = TimeUnit.SECONDS;

private static final JobService INSTANCE = new JobService();
public static final JobService INSTANCE = new JobService();
private static ExecutorService jobCleanup = null;

public static JobService getInstance(){
if (jobCleanup == null)
{
jobCleanup = INSTANCE.jobCleanup();
}
return INSTANCE;
}

public JobService()
{
//No config required
Expand All @@ -55,6 +46,7 @@ public JobService()
*
* @return
*/
@Override
public Set<String> getJobPools()
{
return serviceMap.keySet();
Expand All @@ -65,6 +57,7 @@ public Set<String> getJobPools()
*
* @return
*/
@Override
public Set<String> getPollingPools()
{
return pollingMap.keySet();
Expand All @@ -75,6 +68,7 @@ public Set<String> getPollingPools()
*
* @param pool The pool to remove
*/
@Override
public ExecutorService removeJob(String pool)
{
ExecutorService es = serviceMap.get(pool);
Expand All @@ -93,6 +87,7 @@ public ExecutorService removeJob(String pool)
*
* @param pool The pool name to remove
*/
@Override
public ScheduledExecutorService removePollingJob(String pool)
{
ScheduledExecutorService es = pollingMap.get(pool);
Expand All @@ -112,6 +107,7 @@ public ScheduledExecutorService removePollingJob(String pool)
* @param name
* @param executorService
*/
@Override
public ExecutorService registerJobPool(String name, ExecutorService executorService)
{
if (serviceMap.containsKey(name))
Expand All @@ -126,7 +122,8 @@ public ExecutorService registerJobPool(String name, ExecutorService executorServ
if (executorService instanceof ForkJoinPool)
{
ForkJoinPool pool = (ForkJoinPool) executorService;
} else if (executorService instanceof ThreadPoolExecutor)
}
else if (executorService instanceof ThreadPoolExecutor)
{
ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;
executor.setMaximumPoolSize(maxQueueCount.get(name));
Expand All @@ -143,6 +140,7 @@ public ExecutorService registerJobPool(String name, ExecutorService executorServ
* @param name The name of the pool
* @param executorService The service executor
*/
@Override
public ScheduledExecutorService registerJobPollingPool(String name, ScheduledExecutorService executorService)
{
if (pollingMap.containsKey(name))
Expand All @@ -159,9 +157,14 @@ public ScheduledExecutorService registerJobPollingPool(String name, ScheduledExe
* @param jobPoolName
* @param thread
*/
@Override
public ExecutorService addJob(String jobPoolName, Runnable thread)
{
if (!serviceMap.containsKey(jobPoolName) || serviceMap.get(jobPoolName).isTerminated() || serviceMap.get(jobPoolName).isShutdown())
if (!serviceMap.containsKey(jobPoolName) || serviceMap
.get(jobPoolName)
.isTerminated() || serviceMap
.get(jobPoolName)
.isShutdown())
{
registerJobPool(jobPoolName, executorServiceSupplier.get());
}
Expand All @@ -171,7 +174,7 @@ public ExecutorService addJob(String jobPoolName, Runnable thread)
{
log.log(Level.FINER, maxQueueCount + " Hit - Finishing before next run");
removeJob(jobPoolName);
service = registerJobPool(jobPoolName,executorServiceSupplier.get());
service = registerJobPool(jobPoolName, executorServiceSupplier.get());
}
service.execute(thread);
return service;
Expand All @@ -183,9 +186,14 @@ public ExecutorService addJob(String jobPoolName, Runnable thread)
* @param jobPoolName
* @param thread
*/
@Override
public Future<?> addTask(String jobPoolName, Callable<?> thread)
{
if (!serviceMap.containsKey(jobPoolName) || serviceMap.get(jobPoolName).isTerminated() || serviceMap.get(jobPoolName).isShutdown())
if (!serviceMap.containsKey(jobPoolName) || serviceMap
.get(jobPoolName)
.isTerminated() || serviceMap
.get(jobPoolName)
.isShutdown())
{
registerJobPool(jobPoolName, executorServiceSupplier.get());
}
Expand All @@ -200,11 +208,13 @@ public Future<?> addTask(String jobPoolName, Callable<?> thread)
return service.submit(thread);
}

@Override
public void waitForJob(String jobName)
{
waitForJob(jobName, defaultWaitTime, defaultWaitUnit);
}

@Override
public void waitForJob(String jobName, long timeout, TimeUnit unit)
{
if (!serviceMap.containsKey(jobName))
Expand All @@ -216,7 +226,8 @@ public void waitForJob(String jobName, long timeout, TimeUnit unit)
try
{
service.awaitTermination(timeout, unit);
} catch (InterruptedException e)
}
catch (InterruptedException e)
{
log.log(Level.WARNING, "Thread didn't close cleanly, make sure running times are acceptable", e);
service.shutdownNow();
Expand All @@ -231,15 +242,15 @@ public void waitForJob(String jobName, long timeout, TimeUnit unit)
private ExecutorService jobCleanup()
{
ScheduledExecutorService jobsShutdownNotClosed = addPollingJob("JobsShutdownNotClosed", () -> {
for (String jobPool : getInstance().getJobPools())
for (String jobPool : getJobPools())
{
ExecutorService executorService = serviceMap.get(jobPool);
if(executorService.isShutdown() && !executorService.isTerminated())
if (executorService.isShutdown() && !executorService.isTerminated())
{
log.fine("Closing unfinished job - " + jobPool);
removeJob(jobPool);
}
if(executorService.isShutdown() && executorService.isTerminated())
if (executorService.isShutdown() && executorService.isTerminated())
{
log.fine("Cleaning terminated job - " + jobPool);
executorService.close();
Expand All @@ -257,12 +268,19 @@ private ExecutorService jobCleanup()
* @param jobPoolName
* @param thread
*/
@Override
public ScheduledExecutorService addPollingJob(String jobPoolName, Runnable thread, long delay, TimeUnit unit)
{
if (!pollingMap.containsKey(jobPoolName) || pollingMap.get(jobPoolName).isTerminated() || pollingMap.get(jobPoolName).isShutdown())
if (!pollingMap.containsKey(jobPoolName) || pollingMap
.get(jobPoolName)
.isTerminated() || pollingMap
.get(jobPoolName)
.isShutdown())
{
registerJobPollingPool(jobPoolName, Executors.newScheduledThreadPool(Runtime.getRuntime()
.availableProcessors()));
registerJobPollingPool(jobPoolName,
Executors.newScheduledThreadPool(Runtime
.getRuntime()
.availableProcessors()));
}
ScheduledExecutorService service = pollingMap.get(jobPoolName);
service.scheduleAtFixedRate(thread, 1L, delay, unit);
Expand All @@ -275,12 +293,19 @@ public ScheduledExecutorService addPollingJob(String jobPoolName, Runnable threa
* @param jobPoolName
* @param thread
*/
@Override
public ScheduledExecutorService addPollingJob(String jobPoolName, Runnable thread, long initialDelay, long delay, TimeUnit unit)
{
if (!pollingMap.containsKey(jobPoolName) || pollingMap.get(jobPoolName).isTerminated() || pollingMap.get(jobPoolName).isShutdown())
if (!pollingMap.containsKey(jobPoolName) || pollingMap
.get(jobPoolName)
.isTerminated() || pollingMap
.get(jobPoolName)
.isShutdown())
{
registerJobPollingPool(jobPoolName, Executors.newScheduledThreadPool(Runtime.getRuntime()
.availableProcessors()));
registerJobPollingPool(jobPoolName,
Executors.newScheduledThreadPool(Runtime
.getRuntime()
.availableProcessors()));
}
ScheduledExecutorService service = pollingMap.get(jobPoolName);
service.scheduleAtFixedRate(thread, initialDelay, delay, unit);
Expand All @@ -290,16 +315,15 @@ public ScheduledExecutorService addPollingJob(String jobPoolName, Runnable threa
/**
* Shutdowns
*/
@Override
public void destroy()
{
log.config("Destroying all running jobs...");
serviceMap.forEach((key, value) ->
{
serviceMap.forEach((key, value) -> {
log.config("Shutting Down [" + key + "]");
removeJob(key);
});
pollingMap.forEach((key, value) ->
{
pollingMap.forEach((key, value) -> {
log.config("Shutting Down Poll Job [" + key + "]");
removePollingJob(key);
});
Expand All @@ -312,7 +336,8 @@ private int getCurrentTaskCount(ExecutorService service)
{
ForkJoinPool pool = (ForkJoinPool) service;
return (int) pool.getQueuedTaskCount();
} else if (service instanceof ThreadPoolExecutor)
}
else if (service instanceof ThreadPoolExecutor)
{
ThreadPoolExecutor executor = (ThreadPoolExecutor) service;
return (int) executor.getTaskCount();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.guicedee.guicedinjection.implementations;

import com.guicedee.client.*;
import com.guicedee.guicedinjection.*;
import com.guicedee.guicedinjection.interfaces.*;

public class GuiceContextProvision implements IGuiceProvider
{
@Override
public IGuiceContext get()
{
return GuiceContext.instance();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.guicedee.guicedinjection.implementations;

import com.guicedee.guicedinjection.*;
import com.guicedee.guicedinjection.interfaces.*;

public class JobServiceProvision implements IJobServiceProvider
{
@Override
public IJobService get()
{
return JobService.INSTANCE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.guicedee.guicedinjection.GuiceContext;

import com.guicedee.guicedinjection.JobService;
import com.guicedee.guicedinjection.interfaces.IGuiceModule;
import com.guicedee.guicedinjection.interfaces.*;
import com.guicedee.guicedinjection.properties.GlobalProperties;
import io.github.classgraph.ScanResult;
import lombok.extern.java.Log;
Expand Down Expand Up @@ -43,7 +43,9 @@ public void configure() {
.in(Singleton.class);

ContextBinderGuice.log.fine("Bound JobService.class");
bind(IJobService.class)
.toInstance(JobService.INSTANCE);
bind(JobService.class)
.toInstance(JobService.getInstance());
.toInstance(JobService.INSTANCE);
}
}

This file was deleted.

Loading

0 comments on commit c2d44ce

Please sign in to comment.