Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support to handle different cluster for InformerEventSource #2499

Merged
merged 6 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/content/en/docs/features/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,25 @@ parts of reconciliation logic and during the execution of the controller:

For more information about MDC see this [link](https://www.baeldung.com/mdc-in-log4j-2-logback).

## InformerEventSource Multi-Cluster Support

It is possible to handle resources for remote cluster with `InformerEventSource`. To do so,
simply set a client that connects to a remote cluster:

```java

InformerEventSourceConfiguration<Tomcat> configuration =
InformerEventSourceConfiguration.from(SecondaryResource.class, PrimaryResource.class)
.withKubernetesClient(remoteClusterClient)
.withSecondaryToPrimaryMapper(Mappers.fromDefaultAnnotations());

```

You will also need to specify a `SecondaryToPrimaryMapper`, since the default one
is based on owner references and won't work across cluster instances. You could, for example, use the provided implementation that relies on annotations added to the secondary resources to identify the associated primary resource.

See related [integration test](https://github.com/operator-framework/java-operator-sdk/tree/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/informerremotecluster).

## Dynamically Changing Target Namespaces

A controller can be configured to watch a specific set of namespaces in addition of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.config.Informable;
import io.javaoperatorsdk.operator.processing.GroupVersionKind;
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
Expand Down Expand Up @@ -57,22 +58,33 @@ default String name() {
return getInformerConfig().getName();
}

/**
* Optional, specific kubernetes client, typically to connect to a different cluster than the rest
* of the operator. Note that this is solely for multi cluster support.
*/
default Optional<KubernetesClient> getKubernetesClient() {
return Optional.empty();
}

class DefaultInformerEventSourceConfiguration<R extends HasMetadata>
implements InformerEventSourceConfiguration<R> {
private final PrimaryToSecondaryMapper<?> primaryToSecondaryMapper;
private final SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
private final GroupVersionKind groupVersionKind;
private final InformerConfiguration<R> informerConfig;
private final KubernetesClient kubernetesClient;

protected DefaultInformerEventSourceConfiguration(
GroupVersionKind groupVersionKind,
PrimaryToSecondaryMapper<?> primaryToSecondaryMapper,
SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper,
InformerConfiguration<R> informerConfig) {
InformerConfiguration<R> informerConfig,
KubernetesClient kubernetesClient) {
this.informerConfig = Objects.requireNonNull(informerConfig);
this.groupVersionKind = groupVersionKind;
this.primaryToSecondaryMapper = primaryToSecondaryMapper;
this.secondaryToPrimaryMapper = secondaryToPrimaryMapper;
this.kubernetesClient = kubernetesClient;
}

@Override
Expand All @@ -95,8 +107,12 @@ public <P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondary
public Optional<GroupVersionKind> getGroupVersionKind() {
return Optional.ofNullable(groupVersionKind);
}
}

@Override
public Optional<KubernetesClient> getKubernetesClient() {
return Optional.ofNullable(kubernetesClient);
}
}

@SuppressWarnings({"unused", "UnusedReturnValue"})
class Builder<R extends HasMetadata> {
Expand All @@ -108,6 +124,7 @@ class Builder<R extends HasMetadata> {
private String name;
private PrimaryToSecondaryMapper<?> primaryToSecondaryMapper;
private SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
private KubernetesClient kubernetesClient;

private Builder(Class<R> resourceClass,
Class<? extends HasMetadata> primaryResourceClass) {
Expand Down Expand Up @@ -152,6 +169,16 @@ public Builder<R> withSecondaryToPrimaryMapper(
return this;
}

/**
* Use this is case want to create an InformerEventSource that handles resources from different
* cluster.
*/
public Builder<R> withKubernetesClient(
KubernetesClient kubernetesClient) {
this.kubernetesClient = kubernetesClient;
return this;
}

public String getName() {
return name;
}
Expand Down Expand Up @@ -192,7 +219,7 @@ public InformerEventSourceConfiguration<R> build() {
Objects.requireNonNullElse(secondaryToPrimaryMapper,
Mappers.fromOwnerReferences(HasMetadata.getApiVersion(primaryResourceClass),
HasMetadata.getKind(primaryResourceClass), false)),
config.buildForInformerEventSource());
config.buildForInformerEventSource(), kubernetesClient);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.*;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand Down Expand Up @@ -67,18 +69,17 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
extends ManagedInformerEventSource<R, P, InformerEventSourceConfiguration<R>>
implements ResourceEventHandler<R> {

public static String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous";

private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);

public static final String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous";
// we need direct control for the indexer to propagate the just update resource also to the index
private final PrimaryToSecondaryIndex<R> primaryToSecondaryIndex;
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;
private final String id = UUID.randomUUID().toString();

public InformerEventSource(
InformerEventSourceConfiguration<R> configuration, EventSourceContext<P> context) {
this(configuration, context.getClient(),
this(configuration,
configuration.getKubernetesClient().orElse(context.getClient()),
context.getControllerConfiguration().getConfigurationService()
.parseResourceVersionsForEventFilteringAndCaching());
}
Expand Down Expand Up @@ -287,10 +288,6 @@ private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObj
}
}

private enum Operation {
ADD, UPDATE
}

private boolean acceptedByDeleteFilters(R resource, boolean b) {
return (onDeleteFilter == null || onDeleteFilter.accept(resource, b)) &&
(genericFilter == null || genericFilter.accept(resource));
Expand All @@ -307,4 +304,8 @@ public R addPreviousAnnotation(String resourceVersion, R target) {
id + Optional.ofNullable(resourceVersion).map(rv -> "," + rv).orElse(""));
return target;
}

private enum Operation {
ADD, UPDATE
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.javaoperatorsdk.operator.baseapi.informerremotecluster;

import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;

@Group("sample.javaoperatorsdk")
@Version("v1")
@ShortNames("irc")
public class InformerRemoteClusterCustomResource
extends CustomResource<Void, InformerRemoteClusterStatus> implements Namespaced {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package io.javaoperatorsdk.operator.baseapi.informerremotecluster;

import java.util.Map;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.fabric8.kubeapitest.junit.EnableKubeAPIServer;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;

import static io.javaoperatorsdk.operator.baseapi.informerremotecluster.InformerRemoteClusterReconciler.DATA_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

@EnableKubeAPIServer
class InformerRemoteClusterIT {

public static final String NAME = "test1";
public static final String CONFIG_MAP_NAME = "testcm";
public static final String INITIAL_VALUE = "initial_value";
public static final String CHANGED_VALUE = "changed_value";
public static final String CM_NAMESPACE = "default";

// injected by Kube API Test. Client for another cluster.
static KubernetesClient kubernetesClient;

@RegisterExtension
LocallyRunOperatorExtension extension =
LocallyRunOperatorExtension.builder()
.withReconciler(new InformerRemoteClusterReconciler(kubernetesClient))
.build();

@Test
void testRemoteClusterInformer() {
var r = extension.create(testCustomResource());

var cm = kubernetesClient.configMaps()
.resource(remoteConfigMap(r.getMetadata().getName(),
r.getMetadata().getNamespace()))
.create();

// config map does not exist on the primary resource cluster
assertThat(extension.getKubernetesClient().configMaps()
.inNamespace(CM_NAMESPACE)
.withName(CONFIG_MAP_NAME).get()).isNull();

await().untilAsserted(() -> {
var cr = extension.get(InformerRemoteClusterCustomResource.class, NAME);
assertThat(cr.getStatus()).isNotNull();
assertThat(cr.getStatus().getRemoteConfigMapMessage()).isEqualTo(INITIAL_VALUE);
});

cm.getData().put(DATA_KEY, CHANGED_VALUE);
kubernetesClient.configMaps().resource(cm).update();

await().untilAsserted(() -> {
var cr = extension.get(InformerRemoteClusterCustomResource.class, NAME);
assertThat(cr.getStatus().getRemoteConfigMapMessage()).isEqualTo(CHANGED_VALUE);
});
}

InformerRemoteClusterCustomResource testCustomResource() {
var res = new InformerRemoteClusterCustomResource();
res.setMetadata(new ObjectMetaBuilder()
.withName(NAME)
.build());
return res;
}

ConfigMap remoteConfigMap(String ownerName, String ownerNamespace) {
return new ConfigMapBuilder()
.withMetadata(new ObjectMetaBuilder()
.withName(CONFIG_MAP_NAME)
.withNamespace(CM_NAMESPACE)
.withAnnotations(Map.of(
Mappers.DEFAULT_ANNOTATION_FOR_NAME, ownerName,
Mappers.DEFAULT_ANNOTATION_FOR_NAMESPACE, ownerNamespace))
.build())
.withData(Map.of(DATA_KEY, INITIAL_VALUE))
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.javaoperatorsdk.operator.baseapi.informerremotecluster;

import java.util.List;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;

@ControllerConfiguration
public class InformerRemoteClusterReconciler
implements Reconciler<InformerRemoteClusterCustomResource> {

public static final String DATA_KEY = "key";

private final KubernetesClient remoteClient;

public InformerRemoteClusterReconciler(KubernetesClient remoteClient) {
this.remoteClient = remoteClient;
}

@Override
public UpdateControl<InformerRemoteClusterCustomResource> reconcile(
InformerRemoteClusterCustomResource resource,
Context<InformerRemoteClusterCustomResource> context) throws Exception {

return context.getSecondaryResource(ConfigMap.class).map(cm -> {
var r = new InformerRemoteClusterCustomResource();
r.setMetadata(new ObjectMetaBuilder()
.withName(resource.getMetadata().getName())
.withNamespace(resource.getMetadata().getNamespace())
.build());
r.setStatus(new InformerRemoteClusterStatus());
r.getStatus().setRemoteConfigMapMessage(cm.getData().get(DATA_KEY));
return UpdateControl.patchStatus(r);
}).orElseGet(UpdateControl::noUpdate);
}

@Override
public List<EventSource<?, InformerRemoteClusterCustomResource>> prepareEventSources(
EventSourceContext<InformerRemoteClusterCustomResource> context) {

var es = new InformerEventSource<>(InformerEventSourceConfiguration
.from(ConfigMap.class, InformerRemoteClusterCustomResource.class)
// owner references do not work cross cluster, using
// annotations here to reference primary resource
.withSecondaryToPrimaryMapper(Mappers.fromDefaultAnnotations())
// setting remote client for informer
.withKubernetesClient(remoteClient)
.withInformerConfiguration(
InformerConfiguration.Builder::withWatchAllNamespaces)
.build(), context);

return List.of(es);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.javaoperatorsdk.operator.baseapi.informerremotecluster;

public class InformerRemoteClusterStatus {

private String remoteConfigMapMessage;

public String getRemoteConfigMapMessage() {
return remoteConfigMapMessage;
}

public void setRemoteConfigMapMessage(String remoteConfigMapMessage) {
this.remoteConfigMapMessage = remoteConfigMapMessage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class LabelSelectorTestReconciler
@Override
public UpdateControl<LabelSelectorTestCustomResource> reconcile(
LabelSelectorTestCustomResource resource, Context<LabelSelectorTestCustomResource> context) {

numberOfExecutions.addAndGet(1);
return UpdateControl.noUpdate();
}
Expand Down
Loading