Skip to content

Commit

Permalink
Threadpool update
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Aug 8, 2023
1 parent 274bfbf commit e746042
Show file tree
Hide file tree
Showing 3 changed files with 366 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/apache/sysds/api/DMLScript.java
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ public static void cleanupHadoopExecution( DMLConfig config )
FederatedData.clearFederatedWorkers();

//0) shutdown prefetch/broadcast thread pool if necessary
CommonThreadPool.shutdownAsyncRDDPool();
CommonThreadPool.shutdownAsyncPools();

//1) cleanup scratch space (everything for current uuid)
//(required otherwise export to hdfs would skip assumed unnecessary writes if same name)
Expand Down
63 changes: 34 additions & 29 deletions src/main/java/org/apache/sysds/runtime/util/CommonThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.DMLRuntimeException;
Expand Down Expand Up @@ -62,21 +61,23 @@ public class CommonThreadPool implements ExecutorService {
*/
private static final ExecutorService shared = ForkJoinPool.commonPool();
/** A secondary thread local executor that use a custom number of threads */
private static ExecutorService shared2 = null;
private static CommonThreadPool shared2 = null;
/** The number of threads used in the custom secondary executor */
private static int shared2K = -1;
/** Dynamic thread pool, that dynamically allocate threads as tasks come in. */
private static ExecutorService triggerRemoteOPsPool = null;
private static ExecutorService asyncPool = null;
/** This common thread pool */
private final ExecutorService _pool;

/**
* Private constructor of the threadPool.
* Constructor of the threadPool.
* This is intended not to be used except for tests.
* Please use the static constructors.
*
* @param pool The thread pool instance to use.
*/
private CommonThreadPool(ExecutorService pool) {
_pool = pool;
public CommonThreadPool(ExecutorService pool) {
this._pool = pool;
}

/**
Expand Down Expand Up @@ -109,12 +110,11 @@ else if(shared2 == null) {
shared2K = k;
return shared2;
}
else {
return Executors.newFixedThreadPool(k);
}
else
return new CommonThreadPool(Executors.newFixedThreadPool(k));
}
else
return Executors.newFixedThreadPool(k);
return new CommonThreadPool(Executors.newFixedThreadPool(k));
}

/**
Expand All @@ -124,7 +124,7 @@ else if(shared2 == null) {
* @return If we have a cached thread pool.
*/
public static boolean isSharedTPThreads(int k) {
return InfrastructureAnalyzer.getLocalParallelism() == k || shared2K == k || shared2K == -1;
return size == k || shared2K == k || shared2K == -1;
}

/**
Expand Down Expand Up @@ -156,27 +156,33 @@ public static <T> void invokeAndShutdown(ExecutorService pool, Collection<? exte
* @return A dynamic thread pool.
*/
public static ExecutorService getDynamicPool() {
if(triggerRemoteOPsPool != null)
return triggerRemoteOPsPool;
if(asyncPool != null)
return asyncPool;
else {
triggerRemoteOPsPool = Executors.newCachedThreadPool();
return triggerRemoteOPsPool;
asyncPool = Executors.newCachedThreadPool();
return asyncPool;
}
}

/**
* Shutdown the RDD Thread pool.
* Shutdown the cached thread pools.
*/
public static void shutdownAsyncRDDPool() {
if(triggerRemoteOPsPool != null) {
public static void shutdownAsyncPools() {
if(asyncPool != null) {
// shutdown prefetch/broadcast thread pool
triggerRemoteOPsPool.shutdown();
triggerRemoteOPsPool = null;
asyncPool.shutdown();
asyncPool = null;
}
if(shared2 != null) {
// shutdown shared custom thread count pool
shared2.shutdown();
shared2 = null;
shared2K = -1;
}
}

private boolean isCached() {
return _pool == shared || _pool == shared2;
public final boolean isCached() {
return _pool.equals(shared) || this.equals(shared2);
}

@Override
Expand All @@ -187,7 +193,7 @@ public void shutdown() {

@Override
public List<Runnable> shutdownNow() {
return !isCached() ? null : _pool.shutdownNow();
return !isCached() ? _pool.shutdownNow() : null;
}

@Override
Expand Down Expand Up @@ -221,30 +227,29 @@ public Future<?> submit(Runnable task) {
return _pool.submit(task);
}

// unnecessary methods required for API compliance
@Override
public boolean isShutdown() {
throw new NotImplementedException();
return isCached() || _pool.isShutdown();
}

@Override
public boolean isTerminated() {
throw new NotImplementedException();
return isCached() || _pool.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
throw new NotImplementedException();
return isCached() || _pool.awaitTermination(timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
throw new NotImplementedException();
return _pool.invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
throw new NotImplementedException();
return _pool.invokeAny(tasks);
}
}
Loading

0 comments on commit e746042

Please sign in to comment.