From f6cc7d0e9f3afc732b54c4c4aa4d82d1747f0573 Mon Sep 17 00:00:00 2001 From: Willhow <2044783677@qq.com> Date: Mon, 16 Dec 2024 22:31:09 +0800 Subject: [PATCH 1/2] [ISSUE #9044]Supplemental checks that are missing from the UpdateSubGroup operation --- .../processor/AdminBrokerProcessor.java | 17 +++++++++- .../processor/AdminBrokerProcessorTest.java | 33 ++++++++++++++----- .../remoting/protocol/ResponseCode.java | 2 ++ 3 files changed, 43 insertions(+), 9 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index ffac714c1ba..b16fcc9ddf7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1615,6 +1615,9 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c SubscriptionGroupConfig config = RemotingSerializable.decode(request.getBody(), SubscriptionGroupConfig.class); if (config != null) { + if (!updateSubGroupPreCheck(config, response)) { + return response; + } this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(config); } @@ -1638,7 +1641,11 @@ private RemotingCommand updateAndCreateSubscriptionGroupList(ChannelHandlerConte final List groupConfigList = subscriptionGroupList.getGroupConfigList(); final StringBuilder builder = new StringBuilder(); + final RemotingCommand response = RemotingCommand.createResponseCommand(null); for (SubscriptionGroupConfig config : groupConfigList) { + if (!updateSubGroupPreCheck(config, response)) { + return response; + } builder.append(config.getGroupName()).append(";"); } final String groupNames = builder.toString(); @@ -1646,7 +1653,6 @@ private RemotingCommand updateAndCreateSubscriptionGroupList(ChannelHandlerConte groupNames, RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - final RemotingCommand response = RemotingCommand.createResponseCommand(null); try { this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfigList(groupConfigList); response.setCode(ResponseCode.SUCCESS); @@ -1665,6 +1671,15 @@ private RemotingCommand updateAndCreateSubscriptionGroupList(ChannelHandlerConte return response; } + private boolean updateSubGroupPreCheck(SubscriptionGroupConfig config,RemotingCommand resp) { + if (StringUtils.isBlank(config.getGroupName())) { + resp.setCode(ResponseCode.ILLEGAL_ARGUMENT); + resp.setRemark("The subscription group name cannot be empty"); + return false; + } + return true; + } + private void initConsumerOffset(String clientHost, String groupName, int mode, TopicConfig topicConfig) throws ConsumeQueueException { String topic = topicConfig.getTopicName(); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index 959b147d9d3..0989f847703 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -65,14 +65,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; -import org.apache.rocketmq.remoting.protocol.body.AclInfo; -import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody; -import org.apache.rocketmq.remoting.protocol.body.GroupList; -import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; -import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; -import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody; -import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; -import org.apache.rocketmq.remoting.protocol.body.UserInfo; +import org.apache.rocketmq.remoting.protocol.body.*; import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader; @@ -633,6 +626,30 @@ public void testUpdateAndCreateSubscriptionGroup() throws RemotingCommandExcepti assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void testEmptyNameWhenUpdateSubGroup() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null); + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(""); + request.setBody(JSON.toJSON(subscriptionGroupConfig).toString().getBytes()); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.ILLEGAL_ARGUMENT); + } + + @Test + public void testEmptyNameWhenUpdateSubGroupList() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_LIST, null); + SubscriptionGroupList groupList = new SubscriptionGroupList(); + List list = new ArrayList<>(); + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(""); + list.add(subscriptionGroupConfig); + groupList.setGroupConfigList(list); + request.setBody(JSON.toJSON(groupList).toString().getBytes()); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.ILLEGAL_ARGUMENT); + } + @Test public void testGetAllSubscriptionGroupInRocksdb() throws Exception { initRocksdbSubscriptionManager(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java index e2ce81d95b9..9d1afc0f98b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java @@ -96,6 +96,8 @@ public class ResponseCode extends RemotingSysResponseCode { public static final int ILLEGAL_OPERATION = 604; + public static final int ILLEGAL_ARGUMENT = 605; + public static final int RPC_UNKNOWN = -1000; public static final int RPC_ADDR_IS_NULL = -1002; public static final int RPC_SEND_TO_CHANNEL_FAILED = -1004; From 820109a1a1676aa0332ba927ea3126f305ee0d3a Mon Sep 17 00:00:00 2001 From: Willhow <2044783677@qq.com> Date: Fri, 20 Dec 2024 20:32:10 +0800 Subject: [PATCH 2/2] [ISSUE #9044]fix codeStyle --- .../broker/processor/AdminBrokerProcessorTest.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index 0989f847703..f7f7bbd796f 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -65,7 +65,15 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; -import org.apache.rocketmq.remoting.protocol.body.*; +import org.apache.rocketmq.remoting.protocol.body.AclInfo; +import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; +import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; +import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody; +import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; +import org.apache.rocketmq.remoting.protocol.body.UserInfo; +import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupList; import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader;