diff --git a/trpc-core/src/main/java/com/tencent/trpc/core/management/AbstractThreadPoolMXBean.java b/trpc-core/src/main/java/com/tencent/trpc/core/management/AbstractThreadPoolMXBean.java new file mode 100644 index 0000000000..48b56dc4ed --- /dev/null +++ b/trpc-core/src/main/java/com/tencent/trpc/core/management/AbstractThreadPoolMXBean.java @@ -0,0 +1,41 @@ +/* + * Tencent is pleased to support the open source community by making tRPC available. + * + * Copyright (C) 2023 THL A29 Limited, a Tencent company. + * All rights reserved. + * + * If you have downloaded a copy of the tRPC source code from Tencent, + * please note that tRPC source code is licensed under the Apache 2.0 License, + * A copy of the Apache 2.0 License can be found in the LICENSE file. + */ + +package com.tencent.trpc.core.management; + +import java.util.concurrent.atomic.AtomicInteger; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +public abstract class AbstractThreadPoolMXBean implements ThreadPoolMXBean { + + private static final AtomicInteger threadPoolIndex = new AtomicInteger(1); + + private final String objectName; + + public AbstractThreadPoolMXBean() { + this.objectName = WorkerPoolType.THREAD.getName() + BAR + threadPoolIndex.getAndIncrement(); + } + + @Override + public String getType() { + return WorkerPoolType.THREAD.getName(); + } + + @Override + public ObjectName getObjectName() { + try { + return new ObjectName(WORKER_POOL_MXBEAN_DOMAIN_TYPE + ",name=" + objectName); + } catch (MalformedObjectNameException e) { + throw new IllegalArgumentException(e); + } + } +} diff --git a/trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorMXBeanImpl.java b/trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorMXBeanImpl.java new file mode 100644 index 0000000000..cf993c860b --- /dev/null +++ b/trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorMXBeanImpl.java @@ -0,0 +1,74 @@ +/* + * Tencent is pleased to support the open source community by making tRPC available. + * + * Copyright (C) 2023 THL A29 Limited, a Tencent company. + * All rights reserved. + * + * If you have downloaded a copy of the tRPC source code from Tencent, + * please note that tRPC source code is licensed under the Apache 2.0 License, + * A copy of the Apache 2.0 License can be found in the LICENSE file. + */ + +package com.tencent.trpc.core.management; + +import com.tencent.trpc.core.logger.Logger; +import com.tencent.trpc.core.logger.LoggerFactory; + +/** + * Implementation of ThreadPoolMXBean for ThreadPerTaskExecutor using ThreadPerTaskExecutorWrapper. + *

+ * JEP 444 recommends using JFR to monitor virtual threads. + */ +public class ThreadPerTaskExecutorMXBeanImpl extends AbstractThreadPoolMXBean { + + protected static final Logger logger = LoggerFactory.getLogger(ThreadPerTaskExecutorMXBeanImpl.class); + + private final ThreadPerTaskExecutorWrapper wrapper; + + public ThreadPerTaskExecutorMXBeanImpl(ThreadPerTaskExecutorWrapper wrapper) { + this.wrapper = wrapper; + } + + private long totalTaskCount() { + return wrapper.getSubmittedTaskCount(); + } + + private long completedTaskCount() { + return wrapper.getCompletedTaskCount(); + } + + private long executingTaskCount() { + return totalTaskCount() - completedTaskCount(); + } + + @Override + public long getTaskCount() { + return executingTaskCount(); + } + + @Override + public long getCompletedTaskCount() { + return completedTaskCount(); + } + + @Override + public int getCorePoolSize() { + return 0; + } + + @Override + public int getMaximumPoolSize() { + return Integer.MAX_VALUE; + } + + @Override + public int getPoolSize() { + return (int) executingTaskCount(); + } + + @Override + public int getActiveThreadCount() { + return (int) executingTaskCount(); + } + +} diff --git a/trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorWrapper.java b/trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorWrapper.java new file mode 100644 index 0000000000..4c7bb0fc68 --- /dev/null +++ b/trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorWrapper.java @@ -0,0 +1,171 @@ +/* + * Tencent is pleased to support the open source community by making tRPC available. + * + * Copyright (C) 2023 THL A29 Limited, a Tencent company. + * All rights reserved. + * + * If you have downloaded a copy of the tRPC source code from Tencent, + * please note that tRPC source code is licensed under the Apache 2.0 License, + * A copy of the Apache 2.0 License can be found in the LICENSE file. + */ + +package com.tencent.trpc.core.management; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +public class ThreadPerTaskExecutorWrapper implements ExecutorService { + + private final ExecutorService executorService; + private final AtomicLong submittedTaskCount = new AtomicLong(); + private final AtomicLong completedTaskCount = new AtomicLong(); + + private ThreadPerTaskExecutorWrapper(ExecutorService executorService) { + this.executorService = executorService; + } + + public static ThreadPerTaskExecutorWrapper wrap(ExecutorService executorService) { + return new ThreadPerTaskExecutorWrapper(executorService); + } + + private TaskCountingRunnable wrap(Runnable task) { + return new TaskCountingRunnable(task); + } + + private TaskCountingCallable wrap(Callable task) { + return new TaskCountingCallable<>(task); + } + + public long getSubmittedTaskCount() { + return submittedTaskCount.get(); + } + + public long getCompletedTaskCount() { + return completedTaskCount.get(); + } + + private class TaskCountingRunnable implements Runnable { + + private final Runnable task; + + public TaskCountingRunnable(Runnable task) { + this.task = task; + } + + @Override + public void run() { + submittedTaskCount.incrementAndGet(); + try { + task.run(); + } finally { + completedTaskCount.incrementAndGet(); + } + } + } + + private class TaskCountingCallable implements Callable { + + private final Callable task; + + public TaskCountingCallable(Callable task) { + this.task = task; + } + + @Override + public T call() throws Exception { + submittedTaskCount.incrementAndGet(); + try { + return task.call(); + } finally { + completedTaskCount.incrementAndGet(); + } + } + } + + @Override + public void shutdown() { + executorService.shutdown(); + } + + @Override + public List shutdownNow() { + return executorService.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return executorService.isShutdown(); + } + + @Override + public boolean isTerminated() { + return executorService.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return executorService.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + return executorService.submit(wrap(task)); + } + + @Override + public Future submit(Runnable task, T result) { + return executorService.submit(wrap(task), result); + } + + @Override + public Future submit(Runnable task) { + return executorService.submit(wrap(task)); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + List> wrappedTasks = tasks.stream() + .map(this::wrap) + .collect(Collectors.toList()); + return executorService.invokeAll(wrappedTasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + List> wrappedTasks = tasks.stream() + .map(this::wrap) + .collect(Collectors.toList()); + return executorService.invokeAll(wrappedTasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + List> wrappedTasks = tasks.stream() + .map(this::wrap) + .collect(Collectors.toList()); + return executorService.invokeAny(wrappedTasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + List> wrappedTasks = tasks.stream() + .map(this::wrap) + .collect(Collectors.toList()); + return executorService.invokeAny(wrappedTasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + executorService.execute(wrap(command)); + } +} diff --git a/trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPoolMXBeanImpl.java b/trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPoolMXBeanImpl.java index cd8ef5ec7e..9262801e60 100644 --- a/trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPoolMXBeanImpl.java +++ b/trpc-core/src/main/java/com/tencent/trpc/core/management/ThreadPoolMXBeanImpl.java @@ -13,26 +13,14 @@ import java.util.Objects; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicInteger; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -public class ThreadPoolMXBeanImpl implements ThreadPoolMXBean { +public class ThreadPoolMXBeanImpl extends AbstractThreadPoolMXBean { - private static final AtomicInteger threadPoolIndex = new AtomicInteger(1); private final ThreadPoolExecutor threadPool; - private final String objectName; - public ThreadPoolMXBeanImpl(ThreadPoolExecutor threadPool) { this.threadPool = Objects.requireNonNull(threadPool, "threadPool is null"); - this.objectName = WorkerPoolType.THREAD.getName() + BAR + threadPoolIndex.getAndIncrement(); - } - - @Override - public String getType() { - return WorkerPoolType.THREAD.getName(); } @Override @@ -65,13 +53,5 @@ public int getMaximumPoolSize() { return threadPool.getMaximumPoolSize(); } - @Override - public ObjectName getObjectName() { - try { - return new ObjectName(WORKER_POOL_MXBEAN_DOMAIN_TYPE + ",name=" + objectName); - } catch (MalformedObjectNameException e) { - throw new IllegalArgumentException(e); - } - } } \ No newline at end of file diff --git a/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadPoolConfig.java b/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadPoolConfig.java index ab30127839..0c6d1561da 100644 --- a/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadPoolConfig.java +++ b/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadPoolConfig.java @@ -1,7 +1,7 @@ /* * Tencent is pleased to support the open source community by making tRPC available. * - * Copyright (C) 2023 THL A29 Limited, a Tencent company. + * Copyright (C) 2023 THL A29 Limited, a Tencent company. * All rights reserved. * * If you have downloaded a copy of the tRPC source code from Tencent, @@ -60,6 +60,13 @@ public class ThreadPoolConfig { * Default 10 seconds timeout. */ public static final String CLOSE_TIMEOUT = "close_timeout"; + /** + * Whether to use virtual threads for Java21. + *

+ * See java.util.concurrent.Executors.newVirtualThreadPerTaskExecutor() + * or java.util.concurrent.Executors.newThreadPerTaskExecutor(ThreadFactory) + */ + public static final String USE_VIRTUAL_THREAD_PER_TASK_EXECUTOR = "use_virtual_thread_per_task_executor"; /** * Whether to use coroutine. */ @@ -96,6 +103,10 @@ public class ThreadPoolConfig { private String namePrefix; private boolean daemon = Boolean.TRUE; private int closeTimeout = DEFAULT_CLOSE_TIMEOUT; + /** + * Whether to use virtual threads Executors.newThreadPerTaskExecutor + */ + private boolean useVirtualThreadPerTaskExecutor; private boolean useFiber; private int fiberParallel; private boolean shareSchedule; @@ -112,6 +123,8 @@ public static ThreadPoolConfig parse(String id, Map extMap) { Objects.requireNonNull(extMap, "extMap"); ThreadPoolConfig config = new ThreadPoolConfig(); config.id = id; + config.useVirtualThreadPerTaskExecutor = MapUtils.getBooleanValue(extMap, USE_VIRTUAL_THREAD_PER_TASK_EXECUTOR, + Boolean.FALSE); config.useFiber = MapUtils.getBooleanValue(extMap, USE_FIBER, Boolean.FALSE); if (config.useFiber()) { config.corePoolSize = MapUtils.getIntValue(extMap, CORE_POOL_SIZE, DEFAULT_BIZ_VIRTUAL_CORE_THREADS); @@ -139,11 +152,11 @@ public static void validate(PluginConfig config) { } public void validate() { - PreconditionUtils.checkArgument(corePoolSize > 0, "id[%s],corePoolSize[%s] should > 0", id, + PreconditionUtils.checkArgument(corePoolSize >= 0, "id[%s],corePoolSize[%s] should >= 0", id, corePoolSize); - PreconditionUtils.checkArgument(queueSize >= 0, "id[%s],queueSize[%s] should > 0", id, + PreconditionUtils.checkArgument(queueSize >= 0, "id[%s],queueSize[%s] should >= 0", id, queueSize); - PreconditionUtils.checkArgument(closeTimeout >= 0, "id[%s],queueSize[%s] should > 0", id, + PreconditionUtils.checkArgument(closeTimeout >= 0, "id[%s],queueSize[%s] should >= 0", id, closeTimeout); } @@ -157,6 +170,7 @@ public Map toMap() { map.put(NAME_PREFIX, namePrefix); map.put(DAEMON, daemon); map.put(CLOSE_TIMEOUT, closeTimeout); + map.put(USE_VIRTUAL_THREAD_PER_TASK_EXECUTOR, useVirtualThreadPerTaskExecutor); map.put(USE_FIBER, useFiber); map.put(FIBER_PARALLEL, fiberParallel); map.put(SHARE_SCHEDULE, shareSchedule); @@ -244,6 +258,14 @@ public ThreadPoolConfig setCloseTimeout(int closeTimeout) { return this; } + public boolean useVirtualThreadPerTaskExecutor() { + return useVirtualThreadPerTaskExecutor; + } + + public void setUseVirtualThreadPerTaskExecutor(boolean useVirtualThreadPerTaskExecutor) { + this.useVirtualThreadPerTaskExecutor = useVirtualThreadPerTaskExecutor; + } + public boolean useFiber() { return useFiber; } diff --git a/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPool.java b/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPool.java index d253408f36..29d97a4fcc 100644 --- a/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPool.java +++ b/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPool.java @@ -23,6 +23,8 @@ import com.tencent.trpc.core.logger.Logger; import com.tencent.trpc.core.logger.LoggerFactory; import com.tencent.trpc.core.management.PoolMXBean; +import com.tencent.trpc.core.management.ThreadPerTaskExecutorMXBeanImpl; +import com.tencent.trpc.core.management.ThreadPerTaskExecutorWrapper; import com.tencent.trpc.core.management.ThreadPoolMXBean; import com.tencent.trpc.core.management.ThreadPoolMXBeanImpl; import com.tencent.trpc.core.management.support.MBeanRegistryHelper; @@ -59,6 +61,8 @@ public class ThreadWorkerPool extends AbstractWorkerPool private static final String NAME = "name"; private static final String SCHEDULER_NAME = "scheduler"; private static final String FACTORY_NAME = "factory"; + private static final String EXECUTORS_CLASS_NAME = "java.util.concurrent.Executors"; + private static final String NEW_THREAD_PER_TASK_EXECUTOR_NAME = "newThreadPerTaskExecutor"; private ExecutorService threadPool; private ThreadPoolConfig poolConfig; @@ -90,41 +94,27 @@ public void init() throws TRpcExtensionException { protocolError = new AtomicLong(0); uncaughtExceptionHandler = new TrpcThreadExceptionHandler(errorCount, businessError, protocolError); - ThreadFactory threadFactory = null; - if (poolConfig.useFiber()) { + ThreadFactory threadFactory = getThreadFactory(poolConfig); + if (poolConfig.useVirtualThreadPerTaskExecutor()) { try { - // Since versions below OpenJDK 21 and Tencent JDK non-FIBER versions do not support coroutines, - // introducing the "java.lang.Thread.Builder.OfVirtual" dependency will result in an error, - // so we create coroutines through reflection, which is compatible with JDKs that do not support - // coroutines. When the JDK does not support coroutines, it downgrades to threads. - Class threadClazz = ReflectionUtils.forName(THREAD_CLASS_NAME); - Method ofVirtualMethod = threadClazz.getDeclaredMethod(OF_VIRTUAL_NAME); - Object virtual = ofVirtualMethod.invoke(threadClazz); - Class virtualClazz = ofVirtualMethod.getReturnType(); - Method nameMethod = virtualClazz.getMethod(NAME, String.class, long.class); - nameMethod.invoke(virtual, poolConfig.getNamePrefix(), 1); - // Only Tencent Kona JDK FIBER 8+ version support the scheduler method, OpenJDK 21 version does not - // support the scheduler method. - if (!poolConfig.isShareSchedule() - && containsMethod(virtualClazz.getDeclaredMethods(), SCHEDULER_NAME)) { - Method schedulerMethod = virtualClazz.getDeclaredMethod(SCHEDULER_NAME, Executor.class); - schedulerMethod.setAccessible(true); - schedulerMethod.invoke(virtual, Executors.newWorkStealingPool(poolConfig.getFiberParallel())); - } - Method factoryMethod = virtualClazz.getMethod(FACTORY_NAME); - threadFactory = (ThreadFactory) factoryMethod.invoke(virtual); + // Use JDK 21+ method Executors.newThreadPerTaskExecutor(ThreadFactory threadFactory) + // to create a virtual thread executor service + Class executorsClazz = ReflectionUtils.forName(EXECUTORS_CLASS_NAME); + Method newThreadPerTaskExecutorMethod = executorsClazz + .getDeclaredMethod(NEW_THREAD_PER_TASK_EXECUTOR_NAME, ThreadFactory.class); + ThreadPerTaskExecutorWrapper wrappedThreadPool = ThreadPerTaskExecutorWrapper + .wrap((ExecutorService) newThreadPerTaskExecutorMethod.invoke(executorsClazz, threadFactory)); + threadPool = wrappedThreadPool; + threadPoolMXBean = new ThreadPerTaskExecutorMXBeanImpl(wrappedThreadPool); + MBeanRegistryHelper.registerMBean(threadPoolMXBean, threadPoolMXBean.getObjectName()); + logger.info("Successfully created an executor that assigns each task to a " + + "new virtual thread for processing"); + return; } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException exception) { - logger.error("The current JDK version cannot use coroutines, please use OpenJDK 21+ or Tencent " - + "Kona JDK FIBER 8+ version, error: ", exception); + logger.warn("The current JDK version does not support virtual threads, please use OpenJDK 21+, " + + "or remove use_thread_per_task_executor config, error: ", exception); } } - // If coroutines cannot be used, downgrade to threads - if (threadFactory == null) { - threadFactory = new NamedThreadFactory(poolConfig.getNamePrefix(), poolConfig.isDaemon(), - uncaughtExceptionHandler); - logger.warn("If the server uses a synchronous interface, please increase the thread pool size"); - } - threadPool = new ThreadPoolExecutor(poolConfig.getCorePoolSize(), poolConfig.getMaximumPoolSize(), poolConfig.getKeepAliveTimeSeconds(), TimeUnit.SECONDS, poolConfig.getQueueSize() <= 0 ? new LinkedTransferQueue<>() @@ -215,9 +205,50 @@ public UncaughtExceptionHandler getUncaughtExceptionHandler() { return this.uncaughtExceptionHandler; } - private boolean containsMethod(Method[] methods, String name) { + private ThreadFactory getThreadFactory(ThreadPoolConfig poolConfig) { + ThreadFactory threadFactory = null; + // Whether to use virtual threads + if (poolConfig.useFiber() || poolConfig.useVirtualThreadPerTaskExecutor()) { + try { + // Since versions below OpenJDK 21 and Tencent JDK non-FIBER versions do not support virtual threads, + // introducing the "java.lang.Thread.Builder.OfVirtual" dependency will result in an error, + // so we create virtual threads through reflection, which is compatible with JDKs that do not support + // virtual threads. When the JDK does not support virtual threads, it downgrades to thread. + Class threadClazz = ReflectionUtils.forName(THREAD_CLASS_NAME); + Method ofVirtualMethod = threadClazz.getDeclaredMethod(OF_VIRTUAL_NAME); + Object virtual = ofVirtualMethod.invoke(threadClazz); + Class virtualClazz = ofVirtualMethod.getReturnType(); + Method nameMethod = virtualClazz.getMethod(NAME, String.class, long.class); + nameMethod.invoke(virtual, poolConfig.getNamePrefix(), 1); + // Only Tencent Kona JDK FIBER 8+ version support the scheduler method, OpenJDK 21 version does not + // support the scheduler method. + if (poolConfig.useFiber() + && !poolConfig.isShareSchedule() + && containsMethod(virtualClazz.getDeclaredMethods(), SCHEDULER_NAME)) { + Method schedulerMethod = virtualClazz.getDeclaredMethod(SCHEDULER_NAME, Executor.class); + schedulerMethod.setAccessible(true); + schedulerMethod.invoke(virtual, Executors.newWorkStealingPool(poolConfig.getFiberParallel())); + } + Method factoryMethod = virtualClazz.getMethod(FACTORY_NAME); + threadFactory = (ThreadFactory) factoryMethod.invoke(virtual); + logger.info("Successfully created virtual thread factory"); + return threadFactory; + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException exception) { + logger.error("The current JDK version cannot use virtual threads, please use OpenJDK 21+ or " + + "Tencent Kona JDK FIBER 8+ version, error: ", exception); + } + } + // If virtual threads cannot be used, downgrade to threads + threadFactory = new NamedThreadFactory(poolConfig.getNamePrefix(), poolConfig.isDaemon(), + uncaughtExceptionHandler); + logger.warn("Successfully created thread factory. If the server uses a synchronous interface, " + + "please increase the thread pool size"); + return threadFactory; + } + + private boolean containsMethod(Method[] methods, String methodName) { for (Method method : methods) { - if (method.getName().equals(name)) { + if (method.getName().equals(methodName)) { return true; } } diff --git a/trpc-core/src/test/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorMXBeanTest.java b/trpc-core/src/test/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorMXBeanTest.java new file mode 100644 index 0000000000..d5c6072f6e --- /dev/null +++ b/trpc-core/src/test/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorMXBeanTest.java @@ -0,0 +1,34 @@ +/* + * Tencent is pleased to support the open source community by making tRPC available. + * + * Copyright (C) 2023 THL A29 Limited, a Tencent company. + * All rights reserved. + * + * If you have downloaded a copy of the tRPC source code from Tencent, + * please note that tRPC source code is licensed under the Apache 2.0 License, + * A copy of the Apache 2.0 License can be found in the LICENSE file. + */ + +package com.tencent.trpc.core.management; + +import java.util.concurrent.ExecutorService; +import org.junit.Assert; +import org.junit.Test; +import org.powermock.api.mockito.PowerMockito; + +public class ThreadPerTaskExecutorMXBeanTest { + + @Test + public void testThreadPerTaskExecutorMXBean() { + ExecutorService executorService = PowerMockito.mock(ExecutorService.class); + ThreadPerTaskExecutorWrapper wrapper = ThreadPerTaskExecutorWrapper.wrap(executorService); + ThreadPoolMXBean mxBean = new ThreadPerTaskExecutorMXBeanImpl(wrapper); + Assert.assertEquals(0, mxBean.getPoolSize()); + Assert.assertEquals(0, mxBean.getActiveThreadCount()); + Assert.assertEquals(0, mxBean.getTaskCount()); + Assert.assertEquals(0, mxBean.getCompletedTaskCount()); + Assert.assertEquals(0, mxBean.getCorePoolSize()); + Assert.assertEquals(Integer.MAX_VALUE, mxBean.getMaximumPoolSize()); + } + +} \ No newline at end of file diff --git a/trpc-core/src/test/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorWrapperTest.java b/trpc-core/src/test/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorWrapperTest.java new file mode 100644 index 0000000000..1c20521f13 --- /dev/null +++ b/trpc-core/src/test/java/com/tencent/trpc/core/management/ThreadPerTaskExecutorWrapperTest.java @@ -0,0 +1,79 @@ +/* + * Tencent is pleased to support the open source community by making tRPC available. + * + * Copyright (C) 2023 THL A29 Limited, a Tencent company. + * All rights reserved. + * + * If you have downloaded a copy of the tRPC source code from Tencent, + * please note that tRPC source code is licensed under the Apache 2.0 License, + * A copy of the Apache 2.0 License can be found in the LICENSE file. + */ + +package com.tencent.trpc.core.management; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.Assert; +import org.junit.Test; +import org.powermock.api.mockito.PowerMockito; + +public class ThreadPerTaskExecutorWrapperTest { + + @Test + public void testThreadPerTaskExecutorWrapper() throws ExecutionException, InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(1); + ThreadPerTaskExecutorWrapper wrapper = ThreadPerTaskExecutorWrapper.wrap(executorService); + Callable callable = () -> "mock"; + Future submit = wrapper.submit(callable); + submit.get(); + Assert.assertEquals(1, wrapper.getSubmittedTaskCount()); + Assert.assertEquals(1, wrapper.getCompletedTaskCount()); + List> callables = Arrays.asList(callable, callable, callable); + List> futures = wrapper.invokeAll(callables); + for (Future future : futures) { + future.get(); + } + Assert.assertEquals(4, wrapper.getSubmittedTaskCount()); + Assert.assertEquals(4, wrapper.getCompletedTaskCount()); + Runnable runnable = () -> { + }; + Future future = wrapper.submit(runnable); + future.get(); + Assert.assertEquals(5, wrapper.getSubmittedTaskCount()); + Assert.assertEquals(5, wrapper.getCompletedTaskCount()); + } + + @Test + public void testThreadPerTaskExecutorWrapper2() throws ExecutionException, InterruptedException, TimeoutException { + ExecutorService executorService = PowerMockito.mock(ExecutorService.class); + ThreadPerTaskExecutorWrapper wrapper = ThreadPerTaskExecutorWrapper.wrap(executorService); + Runnable runnable = () -> { + }; + Callable callable = () -> "mock"; + List> callables = Collections.singletonList(callable); + wrapper.execute(runnable); + wrapper.submit(runnable); + wrapper.submit(runnable, "mock"); + wrapper.submit(callable); + wrapper.invokeAll(callables); + wrapper.invokeAll(callables, 1, TimeUnit.SECONDS); + wrapper.invokeAny(callables); + wrapper.invokeAny(callables, 1, TimeUnit.SECONDS); + wrapper.shutdown(); + wrapper.shutdownNow(); + wrapper.isShutdown(); + wrapper.isTerminated(); + wrapper.awaitTermination(1, TimeUnit.SECONDS); + Assert.assertEquals(0, wrapper.getSubmittedTaskCount()); + Assert.assertEquals(0, wrapper.getCompletedTaskCount()); + } + +} \ No newline at end of file diff --git a/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ThreadPoolConfigTest.java b/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ThreadPoolConfigTest.java index 641f4bf573..d08b39feda 100644 --- a/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ThreadPoolConfigTest.java +++ b/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ThreadPoolConfigTest.java @@ -1,7 +1,7 @@ /* * Tencent is pleased to support the open source community by making tRPC available. * - * Copyright (C) 2023 THL A29 Limited, a Tencent company. + * Copyright (C) 2023 THL A29 Limited, a Tencent company. * All rights reserved. * * If you have downloaded a copy of the tRPC source code from Tencent, @@ -19,6 +19,7 @@ import com.tencent.trpc.core.worker.WorkerPoolManager; import java.util.HashMap; import java.util.Map; +import org.junit.Assert; import org.junit.Test; public class ThreadPoolConfigTest { @@ -36,6 +37,7 @@ public void test() { config.setNamePrefix("namePrefix"); config.setUseFiber(Boolean.TRUE); config.setShareSchedule(Boolean.TRUE); + config.setUseVirtualThreadPerTaskExecutor(Boolean.FALSE); assertFalse(config.isAllowCoreThreadTimeOut()); assertEquals(50, config.getCloseTimeout()); assertEquals(40, config.getCorePoolSize()); @@ -64,6 +66,7 @@ public void testParse() { properties.put(ThreadPoolConfig.DAEMON, Boolean.FALSE); properties.put(ThreadPoolConfig.CLOSE_TIMEOUT, 10 * 1000); properties.put(ThreadPoolConfig.ALLOW_CORE_THREAD_TIMEOUT, Boolean.TRUE); + properties.put(ThreadPoolConfig.USE_VIRTUAL_THREAD_PER_TASK_EXECUTOR, Boolean.FALSE); properties.put(ThreadPoolConfig.USE_FIBER, Boolean.TRUE); properties.put(ThreadPoolConfig.SHARE_SCHEDULE, Boolean.TRUE); ThreadPoolConfig config = ThreadPoolConfig.parse("1", properties); @@ -77,7 +80,23 @@ public void testParse() { assertEquals(2000, config.getMaximumPoolSize()); assertEquals("test", config.getNamePrefix()); assertEquals(0, config.getQueueSize()); + assertFalse(config.useVirtualThreadPerTaskExecutor()); assertTrue(config.useFiber()); assertTrue(config.isShareSchedule()); } + + @Test + public void testValidate() { + ThreadPoolConfig config = new ThreadPoolConfig(); + config.setCorePoolSize(-1); + Assert.assertThrows(IllegalArgumentException.class, config::validate); + config.setCorePoolSize(0); + config.setQueueSize(-1); + Assert.assertThrows(IllegalArgumentException.class, config::validate); + config.setQueueSize(0); + config.setCloseTimeout(-1); + Assert.assertThrows(IllegalArgumentException.class, config::validate); + config.setCloseTimeout(0); + config.validate(); + } } diff --git a/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPoolTest.java b/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPoolTest.java index fb15540ddc..3a4311a62e 100644 --- a/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPoolTest.java +++ b/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPoolTest.java @@ -13,8 +13,10 @@ import com.tencent.trpc.core.common.config.PluginConfig; import com.tencent.trpc.core.exception.TRpcException; +import com.tencent.trpc.core.exception.TRpcExtensionException; import com.tencent.trpc.core.management.PoolMXBean; import com.tencent.trpc.core.management.PoolMXBean.WorkerPoolType; +import com.tencent.trpc.core.management.ThreadPerTaskExecutorMXBeanImpl; import com.tencent.trpc.core.management.ThreadPoolMXBean; import java.util.HashMap; import java.util.Map; @@ -23,6 +25,8 @@ public class ThreadWorkerPoolTest { + public static final int DEFALUT_POOL_SIZE = 2; + @Test public void testInit() { Map properties = getProperties(); @@ -54,8 +58,8 @@ public void testInit() { private Map getProperties() { Map properties = new HashMap<>(); - properties.put(ThreadPoolConfig.CORE_POOL_SIZE, 2); - properties.put(ThreadPoolConfig.MAXIMUM_POOL_SIZE, 2); + properties.put(ThreadPoolConfig.CORE_POOL_SIZE, DEFALUT_POOL_SIZE); + properties.put(ThreadPoolConfig.MAXIMUM_POOL_SIZE, DEFALUT_POOL_SIZE); properties.put(ThreadPoolConfig.KEEP_ALIVE_TIME_SECONDS, 300000); properties.put(ThreadPoolConfig.QUEUE_SIZE, 0); properties.put(ThreadPoolConfig.NAME_PREFIX, "test"); @@ -78,4 +82,35 @@ public void testCoroutines() { threadWorkerPool.init(); } + @Test + public void testVirtualThreads() { + Map properties = getProperties(); + properties.put(ThreadPoolConfig.USE_VIRTUAL_THREAD_PER_TASK_EXECUTOR, Boolean.TRUE); + PluginConfig poolPluginConfig = new PluginConfig("work_pool", ThreadWorkerPool.class, + properties); + ThreadWorkerPool threadWorkerPool = new ThreadWorkerPool(); + threadWorkerPool.setPluginConfig(poolPluginConfig); + try { + threadWorkerPool.init(); + } catch (TRpcExtensionException e) { + // not in java21+ + return; + } + PoolMXBean report = threadWorkerPool.report(); + ThreadPoolMXBean threadPoolMXBean = (ThreadPoolMXBean) report; + Assert.assertEquals(0, threadPoolMXBean.getPoolSize()); + Assert.assertEquals(0, threadPoolMXBean.getTaskCount()); + Assert.assertEquals(0, threadPoolMXBean.getCompletedTaskCount()); + Assert.assertEquals(0, threadPoolMXBean.getActiveThreadCount()); + Assert.assertEquals(WorkerPoolType.THREAD.getName(), threadPoolMXBean.getType()); + if (threadPoolMXBean instanceof ThreadPerTaskExecutorMXBeanImpl) { + Assert.assertEquals(0, threadPoolMXBean.getCorePoolSize()); + Assert.assertEquals(Integer.MAX_VALUE, threadPoolMXBean.getMaximumPoolSize()); + } else { + Assert.assertEquals(DEFALUT_POOL_SIZE, threadPoolMXBean.getCorePoolSize()); + Assert.assertEquals(DEFALUT_POOL_SIZE, threadPoolMXBean.getMaximumPoolSize()); + } + Assert.assertNotNull(report.toString()); + } + } \ No newline at end of file