-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Plans for migrate topic metadata beyond partition count #20
Comments
It's something we would like to add, but don't have any plans to work on it in the short term. We would certainly be happy to accept any contributions adding support for this. |
Just chiming in here to document my notes. I wrote this in a separate project to get topic configurations alongside the partition count in the TopicDescription object. /**
* Wrapper around {@link AdminClient#describeTopics(Collection)} and {@link AdminClient#describeConfigs(Collection)}
* for a single topic.
*
* @param adminClient a {@link AdminClient}
* @param topic the topic to describe
* @return {@link KafkaFuture} with the result of the topic description
*/
private static KafkaFuture<TopicConfigDescription> whenTopicIsDescribed(AdminClient adminClient, String topic) {
Optional<TopicDescription> topicDescription;
Optional<List<ConfigEntry>> dynamicTopicConfigEntries;
try {
topicDescription = Optional.of(adminClient.describeTopics(Collections.singletonList(topic)).all()
.thenApply(topicDescriptionMap -> topicDescriptionMap.get(topic))
.get());
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
dynamicTopicConfigEntries = Optional.of(adminClient.describeConfigs(Collections.singletonList(resource))
.all()
.thenApply(configMap -> configMap.get(resource).entries()
.stream().filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
.collect(toList())
)
.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Unable to get topic description");
}
return KafkaFuture.completedFuture(new TopicConfigDescription(
topicDescription.orElseThrow(() -> new RuntimeException("Unable to get topic description")),
dynamicTopicConfigEntries.orElseThrow(() -> new RuntimeException("Unable to get topic configurations"))
));
}
static class TopicConfigDescription {
final TopicDescription description;
final List<ConfigEntry> topicConfigs;
public TopicConfigDescription(TopicDescription description, List<ConfigEntry> topicConfigs) {
this.description = description;
this.topicConfigs = topicConfigs;
}
public String getTopicName() {
return description.name();
}
} |
In progress changes to add in Would be interested to know how the monitor thread works exactly... For example, I assume the |
Yes, List can contain partitions multiple topics. This happens in
|
Right, so I guess my question is, should I modify this line to map out instead to an "enriched" .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) Just thinking aloud here as to the best place to get access to the topic configs |
Thinking more about previous suggestion, |
I had a look around and I believe the tool is able to identify if the partition count between source and destination topic is the same.
Is there any plans to be able to replicate other topic metadata like retention, max message size, etc?
Thanks.
The text was updated successfully, but these errors were encountered: