Skip to content

Commit

Permalink
Initial support for managed KafkaTopic resources (streamshub#504)
Browse files Browse the repository at this point in the history
* Initial support for managed KafkaTopic resources

Signed-off-by: Michael Edgar <[email protected]>

* Managed label

* Integration tests to check value of `.meta.managed`, disable CR patch

Signed-off-by: Michael Edgar <[email protected]>

---------

Signed-off-by: Michael Edgar <[email protected]>
Co-authored-by: Riccardo Forina <[email protected]>
  • Loading branch information
MikeEdgar and riccardo-forina authored Feb 29, 2024
1 parent 1de2e45 commit 00b9665
Show file tree
Hide file tree
Showing 8 changed files with 406 additions and 62 deletions.
2 changes: 1 addition & 1 deletion api/.checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@

<module name="ClassFanOutComplexity">
<!-- default is 20 -->
<property name="max" value="44"/>
<property name="max" value="50"/>
<property name="excludedPackages" value="java.time,javax.json,javax.ws.rs,javax.ws.rs.core"/>
</module>
<module name="CyclomaticComplexity">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Named;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreatePartitionsOptions;
Expand Down Expand Up @@ -59,7 +60,10 @@
import com.github.eyefloaters.console.api.support.UnknownTopicIdPatch;
import com.github.eyefloaters.console.api.support.ValidationProxy;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.KafkaTopic;

import static org.apache.kafka.clients.admin.NewPartitions.increaseTo;

Expand Down Expand Up @@ -102,6 +106,13 @@ public class TopicService {
@Inject
Supplier<Admin> clientSupplier;

@Inject
@Named("KafkaTopics")
Map<String, Map<String, Map<String, KafkaTopic>>> managedTopics;

@Inject
KubernetesClient k8s;

@Inject
ConfigService configService;

Expand Down Expand Up @@ -159,16 +170,20 @@ public CompletionStage<List<Topic>> listTopics(List<String> fields, String offse

return listTopics(adminClient, true)
.thenApply(list -> list.stream().map(Topic::fromTopicListing).toList())
.thenComposeAsync(list -> augmentList(adminClient, list, fetchList, offsetSpec), threadContext.currentContextExecutor())
.thenComposeAsync(
list -> augmentList(adminClient, list, fetchList, offsetSpec),
threadContext.currentContextExecutor())
.thenApply(list -> list.stream()
.filter(listSupport)
.map(topic -> tallyStatus(statuses, topic))
.map(listSupport::tally)
.filter(listSupport::betweenCursors)
.sorted(listSupport.getSortComparator())
.dropWhile(listSupport::beforePageBegin)
.takeWhile(listSupport::pageCapacityAvailable)
.toList());
.takeWhile(listSupport::pageCapacityAvailable))
.thenApplyAsync(
topics -> topics.map(this::setManaged).toList(),
threadContext.currentContextExecutor());
}

Topic tallyStatus(Map<String, Integer> statuses, Topic topic) {
Expand All @@ -192,6 +207,7 @@ public CompletionStage<Topic> describeTopic(String topicId, List<String> fields,
CompletableFuture<Topic> describePromise = describeTopics(adminClient, List.of(id), fields, offsetSpec)
.thenApply(result -> result.get(id))
.thenApply(result -> result.getOrThrow(CompletionException::new))
.thenApplyAsync(this::setManaged, threadContext.currentContextExecutor())
.toCompletableFuture();

return describePromise.thenComposeAsync(topic -> {
Expand Down Expand Up @@ -221,30 +237,67 @@ public CompletionStage<Void> patchTopic(String topicId, TopicPatch patch, boolea
return describeTopic(topicId, List.of(Topic.Fields.CONFIGS), KafkaOffsetSpec.LATEST)
.thenApply(topic -> validationService.validate(new TopicValidation.TopicPatchInputs(kafka, topic, patch)))
.thenApply(TopicValidation.TopicPatchInputs::topic)
.thenComposeAsync(topic -> {
List<CompletableFuture<Void>> pending = new ArrayList<>();
.thenComposeAsync(topic -> getManagedTopic(topic.name())
.map(kafkaTopic -> patchManagedTopic())
.orElseGet(() -> patchUnmanagedTopic(topic, patch, validateOnly)),
threadContext.currentContextExecutor());
}

// Modifications disabled for now
CompletionStage<Void> patchManagedTopic(/*KafkaTopic topic, TopicPatch patch, boolean validateOnly*/) {
return CompletableFuture.completedStage(null);
// if (validateOnly) { // NOSONAR
// return CompletableFuture.completedStage(null);
// }
//
// Map<String, Object> modifiedConfig = Optional.ofNullable(patch.configs())
// .map(Map::entrySet)
// .map(Collection::stream)
// .orElseGet(Stream::empty)
// .map(e -> Map.entry(e.getKey(), e.getValue().getValue()))
// .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
//
// KafkaTopic modifiedTopic = new KafkaTopicBuilder(topic)
// .editSpec()
// .withPartitions(patch.numPartitions())
// .withReplicas(patch.replicasAssignments()
// .values()
// .stream()
// .findFirst()
// .map(Collection::size)
// .orElseGet(() -> topic.getSpec().getReplicas()))
// .addToConfig(modifiedConfig)
// .endSpec()
// .build();
//
// return CompletableFuture.runAsync(() -> k8s.resource(modifiedTopic).serverSideApply());
}

CompletionStage<Void> patchUnmanagedTopic(Topic topic, TopicPatch patch, boolean validateOnly) {
List<CompletableFuture<Void>> pending = new ArrayList<>();

pending.add(maybeCreatePartitions(topic, patch, validateOnly));

pending.add(maybeCreatePartitions(topic, patch, validateOnly));
if (!validateOnly) {
pending.addAll(maybeAlterPartitionAssignments(topic, patch));
if (!validateOnly) {
pending.addAll(maybeAlterPartitionAssignments(topic, patch));
}

pending.add(maybeAlterConfigs(topic, patch, validateOnly));

return CompletableFuture.allOf(pending.stream().toArray(CompletableFuture[]::new))
.whenComplete((nothing, error) -> {
if (error != null) {
pending.stream()
.filter(CompletableFuture::isCompletedExceptionally)
.forEach(fut -> fut.exceptionally(ex -> {
if (ex instanceof CompletionException ce) {
ex = ce.getCause();
}
error.addSuppressed(ex);
return null;
}));
}
pending.add(maybeAlterConfigs(topic, patch, validateOnly));

return CompletableFuture.allOf(pending.stream().toArray(CompletableFuture[]::new))
.whenComplete((nothing, error) -> {
if (error != null) {
pending.stream()
.filter(CompletableFuture::isCompletedExceptionally)
.forEach(fut -> fut.exceptionally(ex -> {
if (ex instanceof CompletionException ce) {
ex = ce.getCause();
}
error.addSuppressed(ex);
return null;
}));
}
});
}, threadContext.currentContextExecutor());
});
}

CompletableFuture<Void> maybeCreatePartitions(Topic topic, TopicPatch topicPatch, boolean validateOnly) {
Expand Down Expand Up @@ -373,6 +426,31 @@ public CompletionStage<Void> deleteTopic(String topicId) {
.toCompletionStage();
}

Topic setManaged(Topic topic) {
topic.addMeta("managed", getManagedTopic(topic.name())
.map(kafkaTopic -> Boolean.TRUE)
.orElse(Boolean.FALSE));
return topic;
}

Optional<KafkaTopic> getManagedTopic(String topicName) {
ObjectMeta kafkaMeta = kafkaCluster.get().getMetadata();

return Optional.ofNullable(managedTopics.get(kafkaMeta.getNamespace()))
.map(clustersInNamespace -> clustersInNamespace.get(kafkaMeta.getName()))
.map(topicsInCluster -> topicsInCluster.get(topicName))
.filter(this::isManaged);
}

boolean isManaged(KafkaTopic topic) {
return Optional.of(topic)
.map(KafkaTopic::getMetadata)
.map(ObjectMeta::getAnnotations)
.map(annotations -> annotations.getOrDefault("strimzi.io/managed", "true"))
.map(managed -> !"false".equals(managed))
.orElse(true);
}

CompletionStage<List<Topic>> augmentList(Admin adminClient, List<Topic> list, List<String> fields, String offsetSpec) {
Map<Uuid, Topic> topics = list.stream().collect(Collectors.toMap(t -> Uuid.fromString(t.getId()), Function.identity()));
CompletableFuture<Void> configPromise = maybeDescribeConfigs(adminClient, topics, fields);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.github.eyefloaters.console.api.support;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.event.Startup;
Expand All @@ -8,13 +12,22 @@
import jakarta.inject.Inject;
import jakarta.inject.Named;

import org.jboss.logging.Logger;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.KafkaTopic;

@ApplicationScoped
public class InformerFactory {

private static final String STRIMZI_CLUSTER = "strimzi.io/cluster";

@Inject
Logger logger;

@Inject
KubernetesClient k8s;

Expand All @@ -23,16 +36,67 @@ public class InformerFactory {
@Named("KafkaInformer")
SharedIndexInformer<Kafka> kafkaInformer;

@Produces
@ApplicationScoped
@Named("KafkaTopicInformer")
SharedIndexInformer<KafkaTopic> topicInformer;

@Produces
@ApplicationScoped
@Named("KafkaTopics")
// Keys: namespace -> cluster name -> topic name
Map<String, Map<String, Map<String, KafkaTopic>>> topics = new ConcurrentHashMap<>();

/**
* Initialize CDI beans produced by this factory. Executed on application startup.
*
* @param event CDI startup event
*/
void onStartup(@Observes Startup event) {
kafkaInformer = k8s.resources(Kafka.class).inAnyNamespace().inform();
topicInformer = k8s.resources(KafkaTopic.class).inAnyNamespace().inform();
topicInformer.addEventHandler(new ResourceEventHandler<KafkaTopic>() {
@Override
public void onAdd(KafkaTopic topic) {
topicMap(topic).ifPresent(map -> map.put(topic.getSpec().getTopicName(), topic));
}

@Override
public void onUpdate(KafkaTopic oldTopic, KafkaTopic topic) {
onDelete(oldTopic, false);
onAdd(topic);
}

@Override
public void onDelete(KafkaTopic topic, boolean deletedFinalStateUnknown) {
topicMap(topic).ifPresent(map -> map.remove(topic.getSpec().getTopicName()));
}

Optional<Map<String, KafkaTopic>> topicMap(KafkaTopic topic) {
String namespace = topic.getMetadata().getNamespace();
String clusterName = topic.getMetadata().getLabels().get(STRIMZI_CLUSTER);

if (clusterName == null) {
logger.warnf("KafkaTopic %s/%s is missing label %s and will be ignored",
namespace,
topic.getMetadata().getName(),
STRIMZI_CLUSTER);
return Optional.empty();
}

Map<String, KafkaTopic> map = topics.computeIfAbsent(namespace, k -> new ConcurrentHashMap<>())
.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>());

return Optional.of(map);
}
});
}

public void disposeKafkaInformer(@Disposes @Named("KafkaInformer") SharedIndexInformer<Kafka> informer) {
informer.close();
}

public void disposeTopicInformer(@Disposes @Named("KafkaTopicInformer") SharedIndexInformer<KafkaTopic> informer) {
informer.close();
}
}
Loading

0 comments on commit 00b9665

Please sign in to comment.