Skip to content

Commit

Permalink
排序
Browse files Browse the repository at this point in the history
  • Loading branch information
freakchick committed Jun 25, 2021
1 parent fd69818 commit 24b7e34
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 27 deletions.
39 changes: 17 additions & 22 deletions src/main/java/com/jq/kafkaui/util/KafkaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.Field;

import java.util.*;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -258,7 +259,7 @@ public static ResponseDto clusterInfo(String broker) {

}

public static ResponseDto getAllGroups(String broker,String keyword) {
public static ResponseDto getAllGroups(String broker, String keyword) {
AdminClient client = null;
try {
client = createAdminClientByProperties(broker);
Expand All @@ -269,8 +270,8 @@ public static ResponseDto getAllGroups(String broker,String keyword) {
jo.put("name", t.groupId());
return jo;
}).collect(Collectors.toList());
if (keyword != null){
collect = collect.stream().filter(t->t.getString("name").contains(keyword)).collect(Collectors.toList());
if (keyword != null) {
collect = collect.stream().filter(t -> t.getString("name").contains(keyword)).collect(Collectors.toList());
}

return ResponseDto.success(collect);
Expand Down Expand Up @@ -331,7 +332,7 @@ public static ResponseDto getGroupInfo(String broker, String group) {
KafkaConsumer<String, String> consumer = getConsumer(broker, topics, group, "earliest");
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);

List<JSONObject> collect1 = topicPartitions.stream().map(t -> {
List<JSONObject> collect = topicPartitions.stream().map(t -> {
OffsetAndMetadata offsetAndMetadata = topicPartitionOffsetAndMetadataMap.get(t);
long offset = offsetAndMetadata.offset();
JSONObject jsonObject = new JSONObject();
Expand All @@ -343,26 +344,12 @@ public static ResponseDto getGroupInfo(String broker, String group) {
jsonObject.put("endOffset", endOffset);
jsonObject.put("lag", endOffset - offset);
return jsonObject;
}).sorted(Comparator.comparing(o -> o.getString("topic")))
}).sorted(Comparator.comparing(KafkaUtil::comparingByName).thenComparing(KafkaUtil::comparingByPartition))
.collect(Collectors.toList());

/* Map<String, List<JSONObject>> collect = topicPartitions.stream().map(t -> {
OffsetAndMetadata offsetAndMetadata = topicPartitionOffsetAndMetadataMap.get(t);
long offset = offsetAndMetadata.offset();
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", t.topic());
jsonObject.put("partition", t.partition());
jsonObject.put("offset", offset);
TopicPartition topicPartition = new TopicPartition(t.topic(), t.partition());
Long endOffset = endOffsets.get(topicPartition);
jsonObject.put("endOffset", endOffset);
jsonObject.put("lag", endOffset - offset);
return jsonObject;
}).collect(Collectors.groupingBy(t -> {
return t.getString("topic");
}));*/
// Collection<List<JSONObject>> values1 = collect.values();
return ResponseDto.success(collect1);


return ResponseDto.success(collect);
} catch (Exception e) {
return ResponseDto.fail(e.getMessage());
} finally {
Expand All @@ -371,6 +358,14 @@ public static ResponseDto getGroupInfo(String broker, String group) {

}

private static String comparingByName(JSONObject jo){
return jo.getString("topic");
}

private static Integer comparingByPartition(JSONObject jo){
return jo.getInteger("partition");
}

public static ResponseDto deleteGroup(String broker, String group) {
AdminClient client = null;
try {
Expand Down
9 changes: 4 additions & 5 deletions src/main/webapp/src/components/common/GroupTable.vue
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
<div>
<vxe-table
border
stripe
resizable
:span-method="mergeRowMethod"
:data="data">
<!-- <vxe-table-column type="seq" width="60"></vxe-table-column>-->
<vxe-table-column field="topic" title="topic"></vxe-table-column>
<vxe-table-column field="partition" title="分区号"></vxe-table-column>
<vxe-table-column field="offset" title="消费偏移量"></vxe-table-column>
<vxe-table-column field="lag" title="未消费消息条数"></vxe-table-column>
<vxe-table-column field="topic" title="topic" align="center"></vxe-table-column>
<vxe-table-column field="partition" title="分区号" align="center"></vxe-table-column>
<vxe-table-column field="offset" title="消费偏移量" align="center"></vxe-table-column>
<vxe-table-column field="lag" title="未消费消息条数" align="center"></vxe-table-column>
</vxe-table>
</div>
</template>
Expand Down

0 comments on commit 24b7e34

Please sign in to comment.