Skip to content

Commit

Permalink
[SYSTEMDS-3572] CommonThreadPool Reuse ThreadLocal Pools
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Aug 7, 2023
1 parent e045582 commit fc835d1
Showing 1 changed file with 52 additions and 26 deletions.
78 changes: 52 additions & 26 deletions src/main/java/org/apache/sysds/runtime/util/CommonThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,39 +35,67 @@
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;

/**
* This common thread pool provides an abstraction to obtain a shared
* thread pool, specifically the ForkJoinPool.commonPool, for all requests
* of the maximum degree of parallelism. If pools of different size are
* This common thread pool provides an abstraction to obtain a shared thread pool, specifically the
* ForkJoinPool.commonPool, for all requests of the maximum degree of parallelism. If pools of different size are
* requested, we create new pool instances of FixedThreadPool.
*/
public class CommonThreadPool implements ExecutorService
{
//shared thread pool used system-wide, potentially by concurrent parfor workers
//we use the ForkJoinPool.commonPool() to avoid explicit cleanup, including
//unnecessary initialization (e.g., problematic in jmlc) and because this commonPool
//resulted in better performance than a dedicated fixed thread pool.
public class CommonThreadPool implements ExecutorService {
// shared thread pool used system-wide, potentially by concurrent parfor workers
// we use the ForkJoinPool.commonPool() to avoid explicit cleanup, including
// unnecessary initialization (e.g., problematic in jmlc) and because this commonPool
// resulted in better performance than a dedicated fixed thread pool.
private static final int size = InfrastructureAnalyzer.getLocalParallelism();
private static final ExecutorService shared = ForkJoinPool.commonPool();
// a secondary pool that have a different number of threads.
// this is used in case an instruction is using a different number of threads.
private static ExecutorService shared2 = null;
private static int shared2K = -1;
private final ExecutorService _pool;
public static ExecutorService triggerRemoteOPsPool = null;

public CommonThreadPool(ExecutorService pool) {
_pool = pool;
}

/**
* Get the shared Executor thread pool, that have the number of threads of the host system
*
* @return An ExecutorService
*/
public static ExecutorService get() {
return shared;
}

public static ExecutorService get(int k) {
return new CommonThreadPool( (size==k) ?
shared : Executors.newFixedThreadPool(k));
if(size == k){
return new CommonThreadPool(shared);
}
else if(shared2!= null && shared2K == k){
return new CommonThreadPool(shared2);
}
else if(shared2 == null){
// have to be a fork join pool.
shared2 = new ForkJoinPool(k);
shared2K = k;
return new CommonThreadPool(shared2);
}
else{
return new CommonThreadPool(Executors.newFixedThreadPool(k));
}
}

public static boolean isSharedTPThreads(int k){
return InfrastructureAnalyzer.getLocalParallelism() == k || shared2K ==k || shared2K == -1;
}

public static <T> void invokeAndShutdown(ExecutorService pool, Collection<? extends Callable<T>> tasks) {
try {
//execute tasks
// execute tasks
List<Future<T>> ret = pool.invokeAll(tasks);
//check for errors and exceptions
for( Future<T> r : ret )
// check for errors and exceptions
for(Future<T> r : ret)
r.get();
//shutdown pool
// shutdown pool
pool.shutdown();
}
catch(Exception ex) {
Expand All @@ -80,23 +108,22 @@ public static void shutdownShared() {
}

public static void shutdownAsyncRDDPool() {
if (triggerRemoteOPsPool != null) {
//shutdown prefetch/broadcast thread pool
if(triggerRemoteOPsPool != null) {
// shutdown prefetch/broadcast thread pool
triggerRemoteOPsPool.shutdown();
triggerRemoteOPsPool = null;
}
}

@Override
public void shutdown() {
if( _pool != shared )
if(_pool != shared && _pool != shared2)
_pool.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return ( _pool != shared ) ?
_pool.shutdownNow() : null;
return (_pool != shared && _pool != shared2) ? _pool.shutdownNow() : null;
}

@Override
Expand All @@ -106,10 +133,10 @@ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) th

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
throws InterruptedException {
return _pool.invokeAll(tasks, timeout, unit);
}

@Override
public void execute(Runnable command) {
_pool.execute(command);
Expand All @@ -130,8 +157,7 @@ public Future<?> submit(Runnable task) {
return _pool.submit(task);
}


//unnecessary methods required for API compliance
// unnecessary methods required for API compliance
@Override
public boolean isShutdown() {
throw new NotImplementedException();
Expand All @@ -154,7 +180,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws Interrupt

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
throws InterruptedException, ExecutionException, TimeoutException {
throw new NotImplementedException();
}
}

0 comments on commit fc835d1

Please sign in to comment.