Skip to content

Commit

Permalink
update: resolve review conversations
Browse files Browse the repository at this point in the history
  • Loading branch information
JingkaiTang committed Dec 31, 2024
1 parent 2b7803e commit 8a6fb06
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class ThreadPoolConfig {
* See java.util.concurrent.Executors.newVirtualThreadPerTaskExecutor()
* or java.util.concurrent.Executors.newThreadPerTaskExecutor(ThreadFactory)
*/
public static final String USE_VIRTUAL_THREAD = "use_virtual_thread";
public static final String USE_THREAD_PER_TASK_EXECUTOR = "use_thread_per_task_executor";
/**
* Whether to use coroutine.
*/
Expand Down Expand Up @@ -104,7 +104,7 @@ public class ThreadPoolConfig {
private String namePrefix;
private boolean daemon = Boolean.TRUE;
private int closeTimeout = DEFAULT_CLOSE_TIMEOUT;
private boolean useVirtualThread;
private boolean useThreadPerTaskExecutor;
private boolean useFiber;
private int fiberParallel;
private boolean shareSchedule;
Expand All @@ -121,23 +121,21 @@ public static ThreadPoolConfig parse(String id, Map<String, Object> extMap) {
Objects.requireNonNull(extMap, "extMap");
ThreadPoolConfig config = new ThreadPoolConfig();
config.id = id;
config.useVirtualThread = MapUtils.getBooleanValue(extMap, USE_VIRTUAL_THREAD, Boolean.FALSE);
if (!config.useVirtualThread) {
config.useFiber = MapUtils.getBooleanValue(extMap, USE_FIBER, Boolean.FALSE);
if (config.useFiber()) {
config.corePoolSize = MapUtils.getIntValue(extMap, CORE_POOL_SIZE, DEFAULT_BIZ_VIRTUAL_CORE_THREADS);
config.maximumPoolSize = MapUtils.getIntValue(extMap, MAXIMUM_POOL_SIZE, DEFAULT_BIZ_VIRTUAL_MAX_THREADS);
config.shareSchedule = MapUtils.getBooleanValue(extMap, SHARE_SCHEDULE, Boolean.TRUE);
config.fiberParallel = MapUtils.getIntValue(extMap, FIBER_PARALLEL, Constants.CPUS);
} else {
config.corePoolSize = MapUtils.getIntValue(extMap, CORE_POOL_SIZE, 0);
config.maximumPoolSize = MapUtils.getIntValue(extMap, MAXIMUM_POOL_SIZE, config.corePoolSize);
}
config.keepAliveTimeSeconds = MapUtils.getLongValue(extMap, KEEP_ALIVE_TIME_SECONDS,
DEFAULT_KEEP_ALIVE_TIME_SECONDS);
config.queueSize = MapUtils.getIntValue(extMap, QUEUE_SIZE, DEFAULT_QUEUE_SIZE);
config.allowCoreThreadTimeOut = MapUtils.getBoolean(extMap, ALLOW_CORE_THREAD_TIMEOUT, Boolean.TRUE);
config.useThreadPerTaskExecutor = MapUtils.getBooleanValue(extMap, USE_THREAD_PER_TASK_EXECUTOR, Boolean.FALSE);
config.useFiber = MapUtils.getBooleanValue(extMap, USE_FIBER, Boolean.FALSE);
if (config.useFiber()) {
config.corePoolSize = MapUtils.getIntValue(extMap, CORE_POOL_SIZE, DEFAULT_BIZ_VIRTUAL_CORE_THREADS);
config.maximumPoolSize = MapUtils.getIntValue(extMap, MAXIMUM_POOL_SIZE, DEFAULT_BIZ_VIRTUAL_MAX_THREADS);
config.shareSchedule = MapUtils.getBooleanValue(extMap, SHARE_SCHEDULE, Boolean.TRUE);
config.fiberParallel = MapUtils.getIntValue(extMap, FIBER_PARALLEL, Constants.CPUS);
} else {
config.corePoolSize = MapUtils.getIntValue(extMap, CORE_POOL_SIZE, 0);
config.maximumPoolSize = MapUtils.getIntValue(extMap, MAXIMUM_POOL_SIZE, config.corePoolSize);
}
config.keepAliveTimeSeconds = MapUtils.getLongValue(extMap, KEEP_ALIVE_TIME_SECONDS,
DEFAULT_KEEP_ALIVE_TIME_SECONDS);
config.queueSize = MapUtils.getIntValue(extMap, QUEUE_SIZE, DEFAULT_QUEUE_SIZE);
config.allowCoreThreadTimeOut = MapUtils.getBoolean(extMap, ALLOW_CORE_THREAD_TIMEOUT, Boolean.TRUE);
config.namePrefix = MapUtils.getString(extMap, NAME_PREFIX, id);
config.daemon = MapUtils.getBoolean(extMap, DAEMON, Boolean.TRUE);
config.closeTimeout = MapUtils.getIntValue(extMap, CLOSE_TIMEOUT, DEFAULT_CLOSE_TIMEOUT);
Expand Down Expand Up @@ -169,7 +167,7 @@ public Map<String, Object> toMap() {
map.put(NAME_PREFIX, namePrefix);
map.put(DAEMON, daemon);
map.put(CLOSE_TIMEOUT, closeTimeout);
map.put(USE_VIRTUAL_THREAD, useVirtualThread);
map.put(USE_THREAD_PER_TASK_EXECUTOR, useThreadPerTaskExecutor);
map.put(USE_FIBER, useFiber);
map.put(FIBER_PARALLEL, fiberParallel);
map.put(SHARE_SCHEDULE, shareSchedule);
Expand Down Expand Up @@ -257,12 +255,12 @@ public ThreadPoolConfig setCloseTimeout(int closeTimeout) {
return this;
}

public boolean useVirtualThread() {
return useVirtualThread;
public boolean useThreadPerTaskExecutor() {
return useThreadPerTaskExecutor;
}

public void setUseVirtualThread(boolean useVirtualThread) {
this.useVirtualThread = useVirtualThread;
public void setUseThreadPerTaskExecutor(boolean useThreadPerTaskThread) {
this.useThreadPerTaskExecutor = useThreadPerTaskThread;
}

public boolean useFiber() {
Expand All @@ -273,6 +271,10 @@ public void setUseFiber(boolean useFiber) {
this.useFiber = useFiber;
}

public boolean useVirtualThread() {
return useThreadPerTaskExecutor || useFiber;
}

public boolean isShareSchedule() {
return shareSchedule;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void init() throws TRpcExtensionException {
uncaughtExceptionHandler = new TrpcThreadExceptionHandler(errorCount, businessError, protocolError);

ThreadFactory threadFactory = null;
if (poolConfig.useVirtualThread() || poolConfig.useFiber()) {
if (poolConfig.useVirtualThread()) {
try {
// Since versions below OpenJDK 21 and Tencent JDK non-FIBER versions do not support coroutines,
// introducing the "java.lang.Thread.Builder.OfVirtual" dependency will result in an error,
Expand Down Expand Up @@ -130,7 +130,7 @@ && containsMethod(virtualClazz.getDeclaredMethods(), SCHEDULER_NAME)) {
uncaughtExceptionHandler);
logger.warn("If the server uses a synchronous interface, please increase the thread pool size");
}
if (poolConfig.useVirtualThread()) {
if (poolConfig.useThreadPerTaskExecutor()) {
try {
// Use JDK 21+ method Executors.newThreadPerTaskExecutor(ThreadFactory threadFactory)
// to create a virtual thread executor service
Expand All @@ -142,18 +142,16 @@ && containsMethod(virtualClazz.getDeclaredMethods(), SCHEDULER_NAME)) {
threadPool = wrappedThreadPool;
threadPoolMXBean = new ThreadPerTaskExecutorMXBeanImpl(wrappedThreadPool);

Check warning on line 143 in trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPool.java

View check run for this annotation

Codecov / codecov/patch

trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPool.java#L139-L143

Added lines #L139 - L143 were not covered by tests
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException exception) {
logger.error("The current JDK version does not support virtual threads, please use OpenJDK 21+, " +
"or remove use_virtual_thread config, error: ", exception);
throw new TRpcExtensionException("init worker pool with virtual threads failed", exception);
logger.warn("The current JDK version does not support virtual threads, please use OpenJDK 21+, " +

Check warning on line 145 in trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPool.java

View workflow job for this annotation

GitHub Actions / checkstyle

[checkstyle] reported by reviewdog 🐶 '+' 应另起一行。 Raw Output: /github/workspace/./trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPool.java:145:114: warning: '+' 应另起一行。 (com.puppycrawl.tools.checkstyle.checks.whitespace.OperatorWrapCheck)
"or remove use_thread_per_task_executor config, error: ", exception);
}

Check warning on line 147 in trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPool.java

View check run for this annotation

Codecov / codecov/patch

trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPool.java#L147

Added line #L147 was not covered by tests
} else {
threadPool = new ThreadPoolExecutor(poolConfig.getCorePoolSize(),
poolConfig.getMaximumPoolSize(), poolConfig.getKeepAliveTimeSeconds(),
TimeUnit.SECONDS, poolConfig.getQueueSize() <= 0 ? new LinkedTransferQueue<>()
: new LinkedBlockingQueue<>(poolConfig.getQueueSize()), threadFactory);
((ThreadPoolExecutor) threadPool).allowCoreThreadTimeOut(poolConfig.isAllowCoreThreadTimeOut());
threadPoolMXBean = new ThreadPoolMXBeanImpl((ThreadPoolExecutor) threadPool);
}
threadPool = new ThreadPoolExecutor(poolConfig.getCorePoolSize(),
poolConfig.getMaximumPoolSize(), poolConfig.getKeepAliveTimeSeconds(),
TimeUnit.SECONDS, poolConfig.getQueueSize() <= 0 ? new LinkedTransferQueue<>()
: new LinkedBlockingQueue<>(poolConfig.getQueueSize()), threadFactory);
((ThreadPoolExecutor) threadPool).allowCoreThreadTimeOut(poolConfig.isAllowCoreThreadTimeOut());
threadPoolMXBean = new ThreadPoolMXBeanImpl((ThreadPoolExecutor) threadPool);
MBeanRegistryHelper.registerMBean(threadPoolMXBean, threadPoolMXBean.getObjectName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void test() {
config.setNamePrefix("namePrefix");
config.setUseFiber(Boolean.TRUE);
config.setShareSchedule(Boolean.TRUE);
config.setUseVirtualThread(Boolean.FALSE);
config.setUseThreadPerTaskExecutor(Boolean.FALSE);
assertFalse(config.isAllowCoreThreadTimeOut());
assertEquals(50, config.getCloseTimeout());
assertEquals(40, config.getCorePoolSize());
Expand Down Expand Up @@ -67,7 +67,7 @@ public void testParse() {
properties.put(ThreadPoolConfig.DAEMON, Boolean.FALSE);
properties.put(ThreadPoolConfig.CLOSE_TIMEOUT, 10 * 1000);
properties.put(ThreadPoolConfig.ALLOW_CORE_THREAD_TIMEOUT, Boolean.TRUE);
properties.put(ThreadPoolConfig.USE_VIRTUAL_THREAD, Boolean.FALSE);
properties.put(ThreadPoolConfig.USE_THREAD_PER_TASK_EXECUTOR, Boolean.FALSE);
properties.put(ThreadPoolConfig.USE_FIBER, Boolean.TRUE);
properties.put(ThreadPoolConfig.SHARE_SCHEDULE, Boolean.TRUE);
ThreadPoolConfig config = ThreadPoolConfig.parse("1", properties);
Expand All @@ -81,7 +81,7 @@ public void testParse() {
assertEquals(2000, config.getMaximumPoolSize());
assertEquals("test", config.getNamePrefix());
assertEquals(0, config.getQueueSize());
assertFalse(config.useVirtualThread());
assertFalse(config.useThreadPerTaskExecutor());
assertTrue(config.useFiber());
assertTrue(config.isShareSchedule());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.tencent.trpc.core.exception.TRpcExtensionException;
import com.tencent.trpc.core.management.PoolMXBean;
import com.tencent.trpc.core.management.PoolMXBean.WorkerPoolType;
import com.tencent.trpc.core.management.ThreadPerTaskExecutorMXBeanImpl;
import com.tencent.trpc.core.management.ThreadPoolMXBean;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void testCoroutines() {
@Test
public void testVirtualThreads() {
Map<String, Object> properties = getProperties();
properties.put(ThreadPoolConfig.USE_VIRTUAL_THREAD, Boolean.TRUE);
properties.put(ThreadPoolConfig.USE_THREAD_PER_TASK_EXECUTOR, Boolean.TRUE);
PluginConfig poolPluginConfig = new PluginConfig("work_pool", ThreadWorkerPool.class,
properties);
ThreadWorkerPool threadWorkerPool = new ThreadWorkerPool();
Expand All @@ -101,8 +102,13 @@ public void testVirtualThreads() {
Assert.assertEquals(0, threadPoolMXBean.getCompletedTaskCount());
Assert.assertEquals(0, threadPoolMXBean.getActiveThreadCount());
Assert.assertEquals(WorkerPoolType.THREAD.getName(), threadPoolMXBean.getType());
Assert.assertEquals(0, threadPoolMXBean.getCorePoolSize());
Assert.assertEquals(Integer.MAX_VALUE, threadPoolMXBean.getMaximumPoolSize());
if (threadPoolMXBean instanceof ThreadPerTaskExecutorMXBeanImpl) {
Assert.assertEquals(0, threadPoolMXBean.getCorePoolSize());
Assert.assertEquals(Integer.MAX_VALUE, threadPoolMXBean.getMaximumPoolSize());
} else {
Assert.assertEquals(2, threadPoolMXBean.getCorePoolSize());
Assert.assertEquals(2, threadPoolMXBean.getMaximumPoolSize());
}
Assert.assertNotNull(report.toString());
}

Expand Down

0 comments on commit 8a6fb06

Please sign in to comment.