Skip to content

Commit

Permalink
[ISSUE 4622] Checkstyle configured
Browse files Browse the repository at this point in the history
  • Loading branch information
yp969803 committed Nov 30, 2023
1 parent 0b0aa80 commit 371235a
Show file tree
Hide file tree
Showing 23 changed files with 40 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
Expand Down Expand Up @@ -93,7 +92,7 @@ public void run() {
queue.add(new FutureTaskExt<>(requestTask, null));

long headSlowTimeMills = 100;
Awaitility.await().pollDelay(Duration.ofMillis(headSlowTimeMills)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(headSlowTimeMills)).until(() -> true);
assertThat(brokerController.headSlowTimeMills(queue)).isGreaterThanOrEqualTo(headSlowTimeMills);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.common.BrokerConfig;
Expand Down Expand Up @@ -133,7 +133,7 @@ public RemotingCommand answer(InvocationOnMock invocation) throws Throwable {
} else if (invocation.getArgument(0) == nameserver2) {
return buildResponse(Boolean.FALSE);
} else if (invocation.getArgument(0) == nameserver3) {
Awaitility.await().pollDelay(Duration.ofMillis(timeOut+20)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(timeOut + 20)).until(() -> true);
return buildResponse(Boolean.TRUE);
}
return buildResponse(Boolean.TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;


import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
Expand Down Expand Up @@ -173,7 +173,7 @@ public void before() throws Exception {
autoSwitchHAService.init(defaultMessageStore);
replicasManager.start();
// execute schedulingSyncBrokerMetadata()
Awaitility.await().pollDelay(Duration.ofSeconds(SCHEDULE_SERVICE_EXEC_PERIOD)).until(()->true);
Awaitility.await().pollDelay(Duration.ofSeconds(SCHEDULE_SERVICE_EXEC_PERIOD)).until(() -> true);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ protected List<MessageExtBrokerInner> filtered(List<MessageExtBrokerInner> msgs,
public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception {
List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic);

Awaitility.await().pollDelay(Duration.ofMillis(200)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(200)).until(() -> true);


// reset consumer;
Expand Down Expand Up @@ -299,7 +299,7 @@ public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception
public void testGetMessage_withFilterBitMap() throws Exception {
List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic);

Awaitility.await().pollDelay(Duration.ofMillis(200)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(200)).until(() -> true);

for (int i = 0; i < topicCount; i++) {
String realTopic = TOPIC + i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -55,7 +54,7 @@ public void run() {
RequestTask expiredRequest = new RequestTask(runnable, null, null);
queue.add(new FutureTaskExt<>(expiredRequest, null));

Awaitility.await().pollDelay(Duration.ofMillis(100)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(100)).until(() -> true);

RequestTask requestTask = new RequestTask(runnable, null, null);
queue.add(new FutureTaskExt<>(requestTask, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testBasic() throws Exception {
try {
assertThat(popBufferMergeService.addCk(ck, reviveQid, ackOffset, nextBeginOffset)).isTrue();
assertThat(popBufferMergeService.getLatestOffset(topic, group, queueId)).isEqualTo(nextBeginOffset);
Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); // wait background threads of PopBufferMergeService run for some time
Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true); // wait background threads of PopBufferMergeService run for some time
assertThat(popBufferMergeService.addAk(reviveQid, ackMsg)).isTrue();
assertThat(popBufferMergeService.getLatestOffset(topic, group, queueId)).isEqualTo(nextBeginOffset);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.failover.EscapeBridge;
import org.apache.rocketmq.broker.util.HookUtils;
Expand Down Expand Up @@ -220,7 +219,7 @@ public void testDeliverDelayedMessageTimerTask() throws Exception {

// timer run maybe delay, then consumer message again
// and wait offsetTable
Awaitility.await().pollDelay(Duration.ofSeconds(15)).until(()->true);
Awaitility.await().pollDelay(Duration.ofSeconds(15)).until(() -> true);
scheduleMessageService.buildRunningStats(new HashMap<>());

messageResult = getMessage(realQueueId, offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ public void onChanged(String topic, Set<MessageQueue> messageQueues) {
Set<MessageQueue> set = new HashSet<>();
set.add(createMessageQueue());
doReturn(set).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString());
Awaitility.await().pollDelay(Duration.ofMillis(11*1000)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(11 * 1000)).until(() -> true);
assertThat(flag).isTrue();
} finally {
litePullConsumer.shutdown();
Expand Down Expand Up @@ -645,7 +645,7 @@ public void testConsumerAfterShutdown() throws Exception {

new AsyncConsumer().executeAsync(defaultLitePullConsumer);

Awaitility.await().pollDelay(Duration.ofMillis(100)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(100)).until(() -> true);
defaultLitePullConsumer.shutdown();
assertThat(defaultLitePullConsumer.isRunning()).isFalse();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
assertThat(msgs.get(0).getBody()).isEqualTo(msgBody);
countDownLatch.countDown();
try {
Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true);
messageConsumedFlag.set(true);
} catch (Exception e) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void testRun100RandomCase() {
int queueSize = new Random().nextInt(20) + 1;//1-20
testAllocate(queueSize, consumerSize);
try {
Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(1)).until(() -> true);
} catch (Exception e) {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
pullMessageService.executePullRequestImmediately(createPullRequest());
countDownLatch.await();

Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true);

ConsumeStatus stats = normalServie.getConsumerStatsManager().consumeStatus(pushConsumer.getDefaultMQPushConsumerImpl().groupName(),topic);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ public void run() {
assertThat(responseMap).isNotNull();
while (!finish.get()) {
try {
Awaitility.await().pollDelay(Duration.ofMillis(10)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(10)).until(() -> true);
} catch (ConditionTimeoutException e) {
}
MessageExt responseMsg = new MessageExt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void run() {
if (executor.getCompletedTaskCount() == 10) {
break;
}
Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true);
}
// simulate schedule task execution , tps stat
{
Expand Down Expand Up @@ -115,7 +115,7 @@ public void run() {
if (executor.getCompletedTaskCount() == 10) {
break;
}
Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true);
}
return statsItemSet.getStatsItem("topicTest").getValue();
}
Expand All @@ -136,7 +136,7 @@ public void run() {
if (executor.getCompletedTaskCount() == 10) {
break;
}
Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true);
}
return statsItemSet.getAndCreateStatsItem("test").getValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public void testBrokerLifecycleListener() throws Exception {
assertEquals(DEFAULT_BROKER_NAME, brokerName);
atomicBoolean.set(true);
});
Awaitility.await().pollDelay(Duration.ofMillis(2000)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(2000)).until(() -> true);
assertTrue(atomicBoolean.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ BitsArray gen(int bitCount) {
for (int i = 0; i < bitCount / Byte.SIZE; i++) {
bitsArray.setByte(i, (byte) (new Random(System.currentTimeMillis())).nextInt(0xff));
try {
Awaitility.await().pollDelay(Duration.ofMillis(2)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(2)).until(() -> true);
} catch (ConditionTimeoutException e) {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
Expand Down Expand Up @@ -169,7 +168,7 @@ public void testNamesrv() throws Exception {
RemotingCommand response = clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class),
remotingCommand);
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
Awaitility.await().pollDelay(Duration.ofSeconds(waitSecondsForService+1)).until(()->true);
Awaitility.await().pollDelay(Duration.ofSeconds(waitSecondsForService + 1)).until(() -> true);
response = clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class), remotingCommand);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void testTopicRouteCaffeineCache() throws InterruptedException {
}
});
assertThat(value).isEqualTo(topicCache.get(key));
Awaitility.await().pollDelay(Duration.ofSeconds(5)).until(()->true);
Awaitility.await().pollDelay(Duration.ofSeconds(5)).until(() -> true);
assertThat(value).isEqualTo(topicCache.get(key));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public static boolean createSub(String nameSrvAddr, String clusterName, String c
addr));
} catch (Exception e) {
e.printStackTrace();
Awaitility.await().pollDelay(Duration.ofMillis(1000*1)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(1000 * 1)).until(() -> true);
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -173,7 +173,7 @@ public static boolean awaitStaticTopicMs(long timeMs, String topic, DefaultMQAdm
if (checkStaticTopic(topic, defaultMQAdminExt, clientInstance)) {
return true;
}
Awaitility.await().pollDelay(Duration.ofMillis(100)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(100)).until(() -> true);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;


import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
Expand Down Expand Up @@ -62,23 +62,23 @@ public static String addQuoteToParamater(String param) {

public static void waitForMonment(long time) {
try {
Awaitility.await().pollDelay(Duration.ofMillis(time)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(time)).until(() -> true);
} catch (ConditionTimeoutException e) {
e.printStackTrace();
}
}

public static void waitForSeconds(long time) {
try {
Awaitility.await().pollDelay(Duration.ofSeconds(time)).until(()->true);
Awaitility.await().pollDelay(Duration.ofSeconds(time)).until(() -> true);
} catch (ConditionTimeoutException e) {
e.printStackTrace();
}
}

public static void waitForMinutes(long time) {
try {
Awaitility.await().pollDelay(Duration.ofMinutes(time)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMinutes(time)).until(() -> true);
} catch (ConditionTimeoutException e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@
package org.apache.rocketmq.test.util;

import java.time.Duration;
import java.util.concurrent.TimeUnit;


import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;

import io.opentelemetry.sdk.metrics.internal.state.DebugUtils;

public class TestUtils {
public static void waitForMoment(long time) {
try {
Awaitility.await().pollDelay(Duration.ofMillis(time)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(time)).until(() -> true);
} catch (ConditionTimeoutException var3) {
var3.printStackTrace();
}
Expand All @@ -37,7 +36,7 @@ public static void waitForMoment(long time) {

public static void waitForSeconds(long time) {
try {
Awaitility.await().pollDelay(Duration.ofMillis(time)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(time)).until(() -> true);
} catch (ConditionTimeoutException var3) {
var3.printStackTrace();
}
Expand All @@ -46,7 +45,7 @@ public static void waitForSeconds(long time) {

public static void waitForMinutes(long time) {
try {
Awaitility.await().pollDelay(Duration.ofMillis(time)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(time)).until(() -> true);
} catch (ConditionTimeoutException var3) {
var3.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void concurrentPutTest() throws InterruptedException {
Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
try {
Awaitility.await().pollDelay(Duration.ofMillis(100)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(100)).until(() -> true);
} catch (ConditionTimeoutException ignored) {
}
latch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void basicServiceTest() throws InterruptedException {
for (int i = 0; i < 50; i++) {
Assert.assertEquals(AppendResult.SUCCESS, indexService.putKey(
TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, i * 100, MESSAGE_SIZE, System.currentTimeMillis()));
Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(1)).until(() -> true);
}
ConcurrentSkipListMap<Long, IndexFile> timeStoreTable = indexService.getTimeStoreTable();
Assert.assertEquals(3, timeStoreTable.size());
Expand Down Expand Up @@ -213,7 +213,7 @@ public void restartServiceTest() throws InterruptedException {
TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(i)),
i * 100L, MESSAGE_SIZE, System.currentTimeMillis());
Assert.assertEquals(AppendResult.SUCCESS, result);
Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(1)).until(() -> true);
}
long timestamp = indexService.getTimeStoreTable().firstKey();
indexService.shutdown();
Expand Down Expand Up @@ -246,7 +246,7 @@ public void queryFromFileTest() throws InterruptedException, ExecutionException
TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(j)),
i * 100L + j, MESSAGE_SIZE, System.currentTimeMillis());
Assert.assertEquals(AppendResult.SUCCESS, result);
Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(1)).until(() -> true);
}
}

Expand Down Expand Up @@ -280,7 +280,7 @@ public void concurrentGetTest() throws InterruptedException {
indexService.putKey(TOPIC_NAME, TOPIC_ID, j, Collections.singleton(String.valueOf(i)),
i * 100L, i * 100, System.currentTimeMillis());
}
Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true);
Awaitility.await().pollDelay(Duration.ofMillis(1)).until(() -> true);
}

CountDownLatch latch = new CountDownLatch(fileCount * 3);
Expand Down
Loading

0 comments on commit 371235a

Please sign in to comment.