Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
qianye1001 committed Jan 10, 2025
1 parent 3f846a0 commit 5893e9d
Showing 1 changed file with 76 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
Expand All @@ -55,45 +56,56 @@ public String commandName() {
@Override
public String commandDesc() {
return "Convert RocksDB kv config (topics/subscriptionGroups/consumerOffsets) to json. " +
"Use [-n, -c, -t] to send Request to broker ( version >= 5.3.2 ) or use [-p, -t, -e (not require)] load RocksDB. " +
"[rpc mode] Use [-n, -c, -b, -t] to send Request to broker ( version >= 5.3.2 ) or [local mode] use [-p, -t, -j, -e] to load RocksDB. " +
"If -e is provided, tools will export json file instead of std print";
}

@Override
public Options buildCommandlineOptions(Options options) {
Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " +
"topics/subscriptionGroups/consumerOffsets. Required in local mode and default all in rpc mode.");
options.addOption(configTypeOption);

// [local mode] options
Option pathOption = new Option("p", "configPath", true,
"Absolute path to the metadata config directory");
options.addOption(pathOption);

Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " +
"topics/subscriptionGroups/consumerOffsets");
options.addOption(configTypeOption);

Option exportPathOption = new Option("e", "exportFile", true,
"Absolute file path for exporting, auto backup existing file, not directory!");
"Absolute file path for exporting, auto backup existing file, not directory. If exportFile is provided, will export Json file and ignore [-j].");
options.addOption(exportPathOption);

Option jsonEnableOption = new Option("j", "jsonEnable", true,
"Json format enable, Default: true. If exportFile is provided, will export Json file and ignore [-j].");
options.addOption(jsonEnableOption);

// [rpc mode] options
Option nameserverOption = new Option("n", "nameserverAddr", true,
"nameserverAddr. If nameserverAddr and clusterName are provided, will ignore [-p, -t, -e] args");
"nameserverAddr. If nameserverAddr and ( clusterName or brokerAddr) are provided, will ignore [-p, -e, -j] args");
options.addOption(nameserverOption);

Option clusterOption = new Option("c", "cluster", true,
"Cluster name. If nameserverAddr and clusterName are provided, will ignore [-p, -t, -e] args");
"Cluster name. If nameserverAddr and clusterName are provided, will ignore [-p, -e, -j, -b] args");
options.addOption(clusterOption);

Option brokerAddrOption = new Option("b", "brokerAddr", true,
"Broker address. If nameserverAddr and brokerAddr are provided, will ignore [-p, -e, -j] args");
options.addOption(brokerAddrOption);

return options;
}

@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
// call broker to export to json
// [rpc mode] call broker to export to json file
if (commandLine.hasOption("nameserverAddr")) {
if (!commandLine.hasOption("cluster")) {
System.out.print("need [-c] to send request to broker. \n");
System.out.print("Use [rpc mode] \n");
if (!commandLine.hasOption("cluster") && !commandLine.hasOption("brokerAddr")) {
System.out.print("need [-c] or [-b] arg to send request to broker. \n");
return;
}
List<RocksDBConfigToJsonRequestHeader.ConfigType> typeList = new ArrayList<>();
if (commandLine.hasOption("configPath")) {
if (commandLine.hasOption("configType")) {
String configType = commandLine.getOptionValue("configType").trim();
try {
typeList.addAll(RocksDBConfigToJsonRequestHeader.ConfigType.fromString(configType));
Expand All @@ -112,6 +124,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t

// load rocksdb to print or export json
if (commandLine.hasOption("configPath")) {
System.out.print("Use [local mode]");
if (!commandLine.hasOption("configType")) {
System.out.print("need [-t] to Specify config type \n");
return;
Expand Down Expand Up @@ -143,7 +156,17 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
Map<String, JSONObject> configMap = getConfigMapFromRocksDB(path, type);
if (configMap != null) {
if (exportFile == null) {
System.out.print(JSONObject.toJSONString(configMap, true) + "\n");
if (commandLine.hasOption("jsonEnable") && "false".equalsIgnoreCase(commandLine.getOptionValue("jsonEnable").trim())) {
AtomicLong count = new AtomicLong(0);
for (Map.Entry<String, JSONObject> entry : configMap.entrySet()) {
String configKey = entry.getKey();
System.out.printf("type: %s", configKey);
JSONObject jsonObject = entry.getValue();
jsonObject.forEach((k, v) -> System.out.printf("%d, Key: %s, Value: %s%n", count.incrementAndGet(), k, v));
}
} else {
System.out.print(JSONObject.toJSONString(configMap, true) + "\n");
}
} else {
String jsonString = JSONObject.toJSONString(configMap, true);
try {
Expand All @@ -153,6 +176,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
}
}
}
return;
}
System.out.print(commandDesc() + "\n");
}
Expand All @@ -161,7 +185,7 @@ private static Map<String, JSONObject> getConfigMapFromRocksDB(String path,
RocksDBConfigToJsonRequestHeader.ConfigType configType) {

if (RocksDBConfigToJsonRequestHeader.ConfigType.consumerOffsets.equals(configType)) {
return printConsumerOffsets(path);
return loadConsumerOffsets(path);
}

ConfigRocksDBStorage configRocksDBStorage = new ConfigRocksDBStorage(path, true);
Expand Down Expand Up @@ -204,50 +228,59 @@ private static Map<String, JSONObject> getConfigMapFromRocksDB(String path,
private void callBrokerToExport(CommandLine commandLine, RPCHook rpcHook,
List<RocksDBConfigToJsonRequestHeader.ConfigType> type) {
String nameserverAddr = commandLine.getOptionValue("nameserverAddr").trim();
String clusterName = commandLine.getOptionValue("cluster").trim();

String inputBrokerAddr = commandLine.hasOption('b') ? commandLine.getOptionValue('b').trim() : null;
String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null;

DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook, 30 * 1000);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
defaultMQAdminExt.setNamesrvAddr(nameserverAddr);

List<CompletableFuture<Void>> futureList = new ArrayList<>();

try {
defaultMQAdminExt.start();
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
Map<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Map<String, BrokerData> brokerAddrTable = clusterInfo.getBrokerAddrTable();
if (clusterAddrTable.get(clusterName) == null) {
System.out.print("clusterAddrTable is empty");
return;
}
List<CompletableFuture<Void>> futureList = new ArrayList<>();
for (Map.Entry<String, BrokerData> entry : brokerAddrTable.entrySet()) {
String brokerName = entry.getKey();
BrokerData brokerData = entry.getValue();
String brokerAddr = brokerData.getBrokerAddrs().get(0L);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
try {
defaultMQAdminExt.exportRocksDBConfigToJson(brokerAddr, type);
} catch (Throwable t) {
System.out.print(brokerName + " export error");
throw new CompletionException(this.getClass().getSimpleName() + " command failed", t);
}
return null;
});
futureList.add(future);
}
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete(
(v, t) -> {
System.out.print("broker export done.");
if (clusterName != null) {
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
Map<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Map<String, BrokerData> brokerAddrTable = clusterInfo.getBrokerAddrTable();
if (clusterAddrTable.get(clusterName) == null) {
System.out.print("clusterAddrTable is empty");
return;
}
);
for (Map.Entry<String, BrokerData> entry : brokerAddrTable.entrySet()) {
String brokerName = entry.getKey();
BrokerData brokerData = entry.getValue();
String brokerAddr = brokerData.getBrokerAddrs().get(0L);
futureList.add(sendRequest(type, defaultMQAdminExt, brokerAddr, brokerName));
}
} else if (inputBrokerAddr != null) {
futureList.add(sendRequest(type, defaultMQAdminExt, inputBrokerAddr, null));
}
} catch (Exception e) {
throw new RuntimeException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete(
(v, t) -> System.out.print("broker export done.")
).join();
}

private CompletableFuture<Void> sendRequest(List<RocksDBConfigToJsonRequestHeader.ConfigType> type,
DefaultMQAdminExt defaultMQAdminExt, String brokerAddr, String brokerName) {
return CompletableFuture.supplyAsync(() -> {
try {
defaultMQAdminExt.exportRocksDBConfigToJson(brokerAddr, type);
} catch (Throwable t) {
System.out.print((brokerName != null) ? brokerName : brokerAddr + " export error");
throw new CompletionException(this.getClass().getSimpleName() + " command failed", t);
}
return null;
});
}

private static Map<String, JSONObject> printConsumerOffsets(String path) {
private static Map<String, JSONObject> loadConsumerOffsets(String path) {
ConfigRocksDBStorage configRocksDBStorage = new ConfigRocksDBStorage(path, true);
configRocksDBStorage.start();
RocksIterator iterator = configRocksDBStorage.iterator();
Expand Down

0 comments on commit 5893e9d

Please sign in to comment.