Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fiber refactorings #2

Open
wants to merge 21 commits into
base: java21
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public void when_repeatedBidirectionalStreaming_withFaultyService() throws IOExc

private static Server createServer(BindableService service) throws IOException {
Server server = ServerBuilder.forPort(0)
.executor(Executors.newFixedThreadPool(4))
.executor(Executors.newVirtualThreadPerTaskExecutor())
.addService(service)
.build();
server.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ public void testConcurrentPutAndOrderbyQueries() {
map.addIndex(indexConfig);

int threadsCount = RuntimeAvailableProcessors.get() - 2;
ExecutorService executor = Executors.newFixedThreadPool(threadsCount);
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

int keysPerThread = 5000;
CountDownLatch latch = new CountDownLatch(threadsCount);
Expand Down Expand Up @@ -683,7 +683,7 @@ public void testConcurrentUpdateAndOrderbyQueries() {
map.addIndex(indexConfig);

int threadsCount = RuntimeAvailableProcessors.get() - 2;
ExecutorService executor = Executors.newFixedThreadPool(threadsCount);
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

int keysPerThread = 2500;
CountDownLatch latch = new CountDownLatch(threadsCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void setUp() throws Exception {
public void selectAllFromTableWhereIdColumnParallel() throws Exception {
int repeatCount = 10000;
Future<?>[] futures = new Future[THREAD_COUNT];
var pool = Executors.newFixedThreadPool(THREAD_COUNT);
var pool = Executors.newVirtualThreadPerTaskExecutor();
try {
for (int i = 0; i < THREAD_COUNT; ++i) {
futures[i] = pool.submit(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void test_indexScan() throws InterruptedException {

for (int i = 0, threadsLength = threads.length; i < threadsLength; i++) {
HazelcastInstance inst = createHazelcastClient();
threads[i] = new Thread(() -> {
threads[i] = Thread.ofVirtual().unstarted(() -> {
int numQueriesLocal = 0;
while (!done.get()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void when_addingElementsToCacheMultiThreaded_then_minProperSizeAndElement
};

List<Thread> threadList = IntStream.range(0, threadCount)
.mapToObj(value -> new Thread(runnable))
.mapToObj(value -> Thread.ofVirtual().unstarted(runnable))
.collect(Collectors.toList());
threadList.forEach(Thread::start);
threadList.forEach((ConsumerEx<Thread>) Thread::join);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ protected void handleCommand(String commandInputted) {
println("ops/s = " + repeat * ONE_THOUSAND / (Clock.currentTimeMillis() - t0));
} else if (first.startsWith("&") && first.length() > 1) {
final int fork = Integer.parseInt(first.substring(1));
ExecutorService pool = Executors.newFixedThreadPool(fork);
ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor();
final String threadCommand = command.substring(first.length());
for (int i = 0; i < fork; i++) {
final int threadID = i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void handleCommand(String inputCommand) {
} else if (first.startsWith("&") && first.length() > 1) {
final int fork = Integer.parseInt(first.substring(1));
final String threadCommand = command.substring(first.length());
ExecutorService pool = Executors.newFixedThreadPool(fork);
ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < fork; i++) {
final int threadID = i;
pool.submit(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
final class DnsEndpointResolver
extends HazelcastKubernetesDiscoveryStrategy.EndpointResolver {
// executor service for dns lookup calls
private static final ExecutorService DNS_LOOKUP_SERVICE = Executors.newCachedThreadPool();
private static final ExecutorService DNS_LOOKUP_SERVICE = Executors.newVirtualThreadPerTaskExecutor();

private final String serviceDns;
private final int port;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void concurrentCacheCreation() throws InterruptedException {
errorCounter.incrementAndGet();
}
};
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < threadCount; i++) {
executorService.submit(getCache);
}
Expand All @@ -119,7 +119,7 @@ public void concurrentCacheCreation() throws InterruptedException {

@Test
public void createOrGetConcurrentlySingleCache_fromMultiProviders() {
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();

final CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
Expand All @@ -136,7 +136,7 @@ public void createOrGetConcurrentlySingleCache_fromMultiProviders() {

@Test
public void createConcurrentlyMultipleCaches_fromMultipleProviders() {
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();

final CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testSyncListener() throws Exception {
final CountDownLatch latch = new CountDownLatch(threadCount);
final AtomicBoolean shutdown = new AtomicBoolean(false);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
Thread.ofVirtual().start(() -> {
Random rand = new Random();
for (int i1 = 0; i1 < putCount && !shutdown.get(); i1++) {
String key = String.valueOf(rand.nextInt(putCount));
Expand All @@ -98,7 +98,7 @@ public void testSyncListener() throws Exception {
actualPutCount.incrementAndGet();
}
latch.countDown();
}).start();
});
}

if (!latch.await(ASSERT_TRUE_EVENTUALLY_TIMEOUT, TimeUnit.SECONDS)) {
Expand Down Expand Up @@ -291,7 +291,7 @@ private void testSyncListener_shouldNotHang_AfterAction(String cacheName, Cachin
int threads = 4;
final CountDownLatch latch = new CountDownLatch(threads);
for (int i = 0; i < threads; i++) {
new Thread(() -> {
Thread.ofVirtual().start(() -> {
Random rand = new Random();
while (true) {
try {
Expand All @@ -301,7 +301,7 @@ private void testSyncListener_shouldNotHang_AfterAction(String cacheName, Cachin
}
}
latch.countDown();
}).start();
});
}

// wait a little for putting threads to start
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testPutCacheConfigConcurrently()
throws ExecutionException, InterruptedException {
CacheService cacheService = new TestCacheService(mockNodeEngine, false);

executorService = Executors.newFixedThreadPool(CONCURRENCY);
executorService = Executors.newVirtualThreadPerTaskExecutor();
List<Future<CacheConfig>> futures = new ArrayList<>();
for (int i = 0; i < CONCURRENCY; i++) {
futures.add(
Expand Down Expand Up @@ -130,7 +130,7 @@ public void testPutCacheConfigConcurrently()
public void testPutCacheConfigConcurrently_whenExceptionThrownFromAdditionalSetup() {
CacheService cacheService = new TestCacheService(mockNodeEngine, true);

executorService = Executors.newFixedThreadPool(CONCURRENCY);
executorService = Executors.newVirtualThreadPerTaskExecutor();
List<Future<CacheConfig>> futures = new ArrayList<>();
for (int i = 0; i < CONCURRENCY; i++) {
futures.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testCardinalityEstimatorSpawnNodeInParallel() {
final String name = "testSpawnNodeInParallel";
CardinalityEstimator estimator = instance.getCardinalityEstimator(name);
estimator.add(1L);
final ExecutorService ex = Executors.newFixedThreadPool(parallel);
final ExecutorService ex = Executors.newVirtualThreadPerTaskExecutor();
try {
for (int i = 0; i < total / parallel; i++) {
final HazelcastInstance[] instances = new HazelcastInstance[parallel];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ public void testClusterShutdown_thenCheckOperationsNotHanging() throws Exception
CountDownLatch testFinishedLatch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
Thread thread = new Thread(() -> {
Thread thread = Thread.ofVirtual().unstarted(() -> {
try {
for (int i1 = 0; i1 < mapSize; i1++) {
if (i1 == mapSize / 4) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testClientPortConnection() {
@Test
public void testClientConnectionBeforeServerReady() {
String clusterName = randomString();
ExecutorService executorService = Executors.newFixedThreadPool(2);
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
executorService.submit(() -> {
Config config = new Config();
config.setClusterName(clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public void testClientListenerDisconnected() {
clients.add(client);
}

ExecutorService ex = Executors.newFixedThreadPool(4);
ExecutorService ex = Executors.newVirtualThreadPerTaskExecutor();
try {
for (final HazelcastInstance client : clients) {
ex.execute(client::shutdown);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static void main(String[] args) {
System.out.println(" Get Percentage: " + getPercentage);
System.out.println(" Put Percentage: " + putPercentage);
System.out.println(" Remove Percentage: " + (100 - (putPercentage + getPercentage)));
ExecutorService es = Executors.newFixedThreadPool(threadCount);
ExecutorService es = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < threadCount; i++) {
es.submit((Runnable) () -> {
IMap<String, Object> map = client.getMap("default");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private static void runTest(ICache<Integer, Integer> icacheOnClient,
}
};

ExecutorService executor = Executors.newCachedThreadPool();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

int numOfGetters = 2 * RuntimeAvailableProcessors.get();
for (int i = 0; i < numOfGetters; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {
ArrayList<Thread> threads = new ArrayList<>();

// continuously adds and removes member
Thread shadowMember = new Thread(() -> {
Thread shadowMember = Thread.ofVirtual().unstarted(() -> {
while (!stopTest.get()) {
HazelcastInstance member = hazelcastFactory.newHazelcastInstance(config);
sleepSeconds(5);
Expand All @@ -138,7 +138,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {

for (int i = 0; i < NEAR_CACHE_POPULATE_THREAD_COUNT; i++) {
// populates client Near Cache
Thread populateClientNearCache = new Thread(() -> {
Thread populateClientNearCache = Thread.ofVirtual().unstarted(() -> {
int key = 0;
while (!stopTest.get()) {
clientCache.get(key++);
Expand All @@ -151,7 +151,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {
}

// updates data from member
Thread putFromMember = new Thread(() -> {
Thread putFromMember = Thread.ofVirtual().unstarted(() -> {
while (!stopTest.get()) {
int key = getInt(KEY_COUNT);
int value = getInt(Integer.MAX_VALUE);
Expand All @@ -162,7 +162,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {
});
threads.add(putFromMember);

Thread clearFromMember = new Thread(() -> {
Thread clearFromMember = Thread.ofVirtual().unstarted(() -> {
while (!stopTest.get()) {
memberCache.clear();
sleepSeconds(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void testClusterShutdownDuringMapPutAll() {
final CountDownLatch threadsFinished = new CountDownLatch(numThreads);
final CountDownLatch threadsStarted = new CountDownLatch(numThreads);

ExecutorService executor = Executors.newCachedThreadPool();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < numThreads; i++) {
executor.execute(new Runnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static com.hazelcast.internal.nearcache.impl.NearCacheTestUtils.getBaseConfig;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newVirtualThreadPerTaskExecutor;
import static junit.framework.TestCase.assertNull;

@RunWith(HazelcastParallelClassRunner.class)
Expand All @@ -63,7 +63,7 @@ public void testDestroyAndCreateProxyWithNearCache() {

int createPutGetThreadCount = 2;
int destroyThreadCount = 2;
ExecutorService pool = newFixedThreadPool(createPutGetThreadCount + destroyThreadCount);
ExecutorService pool = newVirtualThreadPerTaskExecutor();

final AtomicBoolean isRunning = new AtomicBoolean(true);
final AtomicReference<Exception> exception = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void tearDown() throws Exception {

@Test
public void stress_stats_by_doing_put_and_remove() throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(4);
ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor();
pool.execute(new Put());
pool.execute(new Put());
pool.execute(new Remove());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {
ArrayList<Thread> threads = new ArrayList<>();

// continuously adds and removes member
Thread shadowMember = new Thread(() -> {
Thread shadowMember = Thread.ofVirtual().unstarted(() -> {
while (!stopTest.get()) {
HazelcastInstance member1 = factory.newHazelcastInstance(config);
sleepSeconds(5);
Expand All @@ -97,7 +97,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {

// populates client Near Cache
for (int i = 0; i < NEAR_CACHE_POPULATE_THREAD_COUNT; i++) {
Thread populateClientNearCache = new Thread(() -> {
Thread populateClientNearCache = Thread.ofVirtual().unstarted(() -> {
int key = 0;
while (!stopTest.get()) {
clientMap.get(key++);
Expand All @@ -111,7 +111,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {
}

// updates map data from member
Thread putFromMember = new Thread(() -> {
Thread putFromMember = Thread.ofVirtual().unstarted(() -> {
while (!stopTest.get()) {
int key = getInt(KEY_COUNT);
int value = getInt(Integer.MAX_VALUE);
Expand All @@ -122,7 +122,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {
});
threads.add(putFromMember);

Thread clearFromMember = new Thread(() -> {
Thread clearFromMember = Thread.ofVirtual().unstarted(() -> {
while (!stopTest.get()) {
memberMap.clear();
sleepSeconds(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void stress_user_listener_removal_upon_query_cache_destroy() throws Inter
final AtomicBoolean stop = new AtomicBoolean(false);
ArrayList<Thread> threads = new ArrayList<>();
for (int i = 0; i < STRESS_TEST_THREAD_COUNT; i++) {
// Refactoring this to Thread.ofVirtual().unstarted(...) causes deadlock due to synchronization/locking
Thread thread = new Thread(() -> {
while (!stop.get()) {
String name = mapNames[getInt(0, 4)];
Expand Down Expand Up @@ -177,6 +178,7 @@ public void event_service_is_empty_after_queryCache_concurrent_destroy() throws
final AtomicBoolean stop = new AtomicBoolean(false);
ArrayList<Thread> threads = new ArrayList<>();
for (int i = 0; i < STRESS_TEST_THREAD_COUNT; i++) {
// Refactoring this to Thread.ofVirtual().unstarted(...) causes deadlock due to synchronization/locking
Thread thread = new Thread(() -> {
while (!stop.get()) {
QueryCache queryCache = map.getQueryCache("a", Predicates.alwaysTrue(), true);
Expand Down Expand Up @@ -238,6 +240,7 @@ public void no_query_cache_left_after_creating_and_destroying_same_map_concurren
final HazelcastInstance client = factory.newHazelcastClient();
final String mapName = "test";

// Refactoring this to Executors.newVirtualThreadPerTaskExecutor() causes deadlock due to synchronization/locking
ExecutorService pool = Executors.newFixedThreadPool(STRESS_TEST_THREAD_COUNT);
final AtomicBoolean stop = new AtomicBoolean(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static void main(String[] args) {
private static void test1() {
final Random rnd = new Random();
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
Thread.ofVirtual().start(() -> {
while (true) {
int random = rnd.nextInt(100);
if (random > 54) {
Expand All @@ -65,10 +65,10 @@ private static void test1() {
TOTAL_PEEK.incrementAndGet();
}
}
}).start();
});
}

new Thread(() -> {
Thread.ofVirtual().start(() -> {
while (true) {
try {
int size = queue.size();
Expand All @@ -86,7 +86,7 @@ private static void test1() {
e.printStackTrace();
}
}
}).start();
});

while (true) {
int sleepTime = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testConcurrentTxnPut() throws Exception {
final MultiMap multiMap = client.getMultiMap(mapName);

final int threads = 10;
final ExecutorService ex = Executors.newFixedThreadPool(threads);
final ExecutorService ex = Executors.newVirtualThreadPerTaskExecutor();
final CountDownLatch latch = new CountDownLatch(threads);
final AtomicReference<Throwable> error = new AtomicReference<>(null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public void tearDown() {
public void testCommitConcurrently() {
int count = 10000;
String name = randomString();
ExecutorService executorService = Executors.newFixedThreadPool(5);
ExecutorService executorServiceForCommit = Executors.newFixedThreadPool(5);
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
ExecutorService executorServiceForCommit = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < count; i++) {
XATransactionRunnable runnable = new XATransactionRunnable(xaResource, name, executorServiceForCommit, i);
executorService.execute(runnable);
Expand Down
Loading