Skip to content

Commit

Permalink
Merge branch 'next' into generalized_match
Browse files Browse the repository at this point in the history
  • Loading branch information
csviri authored Aug 14, 2023
2 parents 7bd9674 + d7b5b87 commit 54a65a6
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,28 @@ private void init(LeaderElectionConfiguration config) {
config.getLeaseDuration(),
config.getRenewDeadline(),
config.getRetryPeriod(),
leaderCallbacks(),
leaderCallbacks(config),
true,
config.getLeaseName()))
.build();
}



private LeaderCallbacks leaderCallbacks() {
private LeaderCallbacks leaderCallbacks(LeaderElectionConfiguration config) {
return new LeaderCallbacks(
this::startLeading,
this::stopLeading,
leader -> log.info("New leader with identity: {}", leader));
() -> {
config.getLeaderCallbacks().ifPresent(LeaderCallbacks::onStartLeading);
LeaderElectionManager.this.startLeading();
},
() -> {
config.getLeaderCallbacks().ifPresent(LeaderCallbacks::onStopLeading);
LeaderElectionManager.this.stopLeading();
},
leader -> {
config.getLeaderCallbacks().ifPresent(cb -> cb.onNewLeader(leader));
log.info("New leader with identity: {}", leader);
});
}

private void startLeading() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.time.Duration;
import java.util.Optional;

import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;

public class LeaderElectionConfiguration {

public static final Duration LEASE_DURATION_DEFAULT_VALUE = Duration.ofSeconds(15);
Expand All @@ -17,13 +19,15 @@ public class LeaderElectionConfiguration {
private final Duration renewDeadline;
private final Duration retryPeriod;

private final LeaderCallbacks leaderCallbacks;

public LeaderElectionConfiguration(String leaseName, String leaseNamespace, String identity) {
this(
leaseName,
leaseNamespace,
LEASE_DURATION_DEFAULT_VALUE,
RENEW_DEADLINE_DEFAULT_VALUE,
RETRY_PERIOD_DEFAULT_VALUE, identity);
RETRY_PERIOD_DEFAULT_VALUE, identity, null);
}

public LeaderElectionConfiguration(String leaseName, String leaseNamespace) {
Expand All @@ -32,7 +36,7 @@ public LeaderElectionConfiguration(String leaseName, String leaseNamespace) {
leaseNamespace,
LEASE_DURATION_DEFAULT_VALUE,
RENEW_DEADLINE_DEFAULT_VALUE,
RETRY_PERIOD_DEFAULT_VALUE, null);
RETRY_PERIOD_DEFAULT_VALUE, null, null);
}

public LeaderElectionConfiguration(String leaseName) {
Expand All @@ -41,7 +45,7 @@ public LeaderElectionConfiguration(String leaseName) {
null,
LEASE_DURATION_DEFAULT_VALUE,
RENEW_DEADLINE_DEFAULT_VALUE,
RETRY_PERIOD_DEFAULT_VALUE, null);
RETRY_PERIOD_DEFAULT_VALUE, null, null);
}

public LeaderElectionConfiguration(
Expand All @@ -50,7 +54,7 @@ public LeaderElectionConfiguration(
Duration leaseDuration,
Duration renewDeadline,
Duration retryPeriod) {
this(leaseName, leaseNamespace, leaseDuration, renewDeadline, retryPeriod, null);
this(leaseName, leaseNamespace, leaseDuration, renewDeadline, retryPeriod, null, null);
}

public LeaderElectionConfiguration(
Expand All @@ -59,13 +63,15 @@ public LeaderElectionConfiguration(
Duration leaseDuration,
Duration renewDeadline,
Duration retryPeriod,
String identity) {
String identity,
LeaderCallbacks leaderCallbacks) {
this.leaseName = leaseName;
this.leaseNamespace = leaseNamespace;
this.leaseDuration = leaseDuration;
this.renewDeadline = renewDeadline;
this.retryPeriod = retryPeriod;
this.identity = identity;
this.leaderCallbacks = leaderCallbacks;
}

public Optional<String> getLeaseNamespace() {
Expand All @@ -91,4 +97,8 @@ public Duration getRetryPeriod() {
public Optional<String> getIdentity() {
return Optional.ofNullable(identity);
}

public Optional<LeaderCallbacks> getLeaderCallbacks() {
return Optional.ofNullable(leaderCallbacks);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.javaoperatorsdk.operator.api.config;

import java.time.Duration;

import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;

import static io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration.*;

public final class LeaderElectionConfigurationBuilder {

private String leaseName;
private String leaseNamespace;
private String identity;
private Duration leaseDuration = LEASE_DURATION_DEFAULT_VALUE;
private Duration renewDeadline = RENEW_DEADLINE_DEFAULT_VALUE;
private Duration retryPeriod = RETRY_PERIOD_DEFAULT_VALUE;
private LeaderCallbacks leaderCallbacks;

private LeaderElectionConfigurationBuilder(String leaseName) {
this.leaseName = leaseName;
}

public static LeaderElectionConfigurationBuilder aLeaderElectionConfiguration(String leaseName) {
return new LeaderElectionConfigurationBuilder(leaseName);
}

public LeaderElectionConfigurationBuilder withLeaseNamespace(String leaseNamespace) {
this.leaseNamespace = leaseNamespace;
return this;
}

public LeaderElectionConfigurationBuilder withIdentity(String identity) {
this.identity = identity;
return this;
}

public LeaderElectionConfigurationBuilder withLeaseDuration(Duration leaseDuration) {
this.leaseDuration = leaseDuration;
return this;
}

public LeaderElectionConfigurationBuilder withRenewDeadline(Duration renewDeadline) {
this.renewDeadline = renewDeadline;
return this;
}

public LeaderElectionConfigurationBuilder withRetryPeriod(Duration retryPeriod) {
this.retryPeriod = retryPeriod;
return this;
}

public LeaderElectionConfigurationBuilder withLeaderCallbacks(LeaderCallbacks leaderCallbacks) {
this.leaderCallbacks = leaderCallbacks;
return this;
}

public LeaderElectionConfiguration build() {
return new LeaderElectionConfiguration(leaseName, leaseNamespace, leaseDuration, renewDeadline,
retryPeriod, identity, leaderCallbacks);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.Optional;
import java.util.function.Function;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;

/**
* Uses a custom index of {@link InformerEventSource} to access the target resource. The index needs
* to be explicitly created when the event source is defined. This approach improves the performance
* to access the resource.
*/
public class IndexDiscriminator<R extends HasMetadata, P extends HasMetadata>
implements ResourceDiscriminator<R, P> {

private final String indexName;
private final String eventSourceName;
private final Function<P, String> keyMapper;

public IndexDiscriminator(String indexName, Function<P, String> keyMapper) {
this(indexName, null, keyMapper);
}

public IndexDiscriminator(String indexName, String eventSourceName,
Function<P, String> keyMapper) {
this.indexName = indexName;
this.eventSourceName = eventSourceName;
this.keyMapper = keyMapper;
}

@Override
public Optional<R> distinguish(Class<R> resource,
P primary,
Context<P> context) {

InformerEventSource<R, P> eventSource =
(InformerEventSource<R, P>) context
.eventSourceRetriever()
.getResourceEventSourceFor(resource, eventSourceName);
var resources = eventSource.byIndex(indexName, keyMapper.apply(primary));
if (resources.isEmpty()) {
return Optional.empty();
} else if (resources.size() > 1) {
throw new IllegalStateException("More than one resource found");
} else {
return Optional.of(resources.get(0));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,41 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.Cache;

public class ResourceIDMatcherDiscriminator<R extends HasMetadata, P extends HasMetadata>
implements ResourceDiscriminator<R, P> {


private final String eventSourceName;
private final Function<P, ResourceID> mapper;

public ResourceIDMatcherDiscriminator(Function<P, ResourceID> mapper) {
this(null, mapper);
}

public ResourceIDMatcherDiscriminator(String eventSourceName, Function<P, ResourceID> mapper) {
this.eventSourceName = eventSourceName;
this.mapper = mapper;
}

@SuppressWarnings("unchecked")
@Override
public Optional<R> distinguish(Class<R> resource, P primary, Context<P> context) {
var resourceID = mapper.apply(primary);
return context.getSecondaryResourcesAsStream(resource)
.filter(resourceID::isSameResource)
.findFirst();
if (eventSourceName != null) {
return ((Cache<R>) context.eventSourceRetriever().getResourceEventSourceFor(resource,
eventSourceName))
.get(resourceID);
} else {
var eventSources = context.eventSourceRetriever().getResourceEventSourcesFor(resource);
if (eventSources.size() == 1) {
return ((Cache<R>) eventSources.get(0)).get(resourceID);
} else {
return context.getSecondaryResourcesAsStream(resource)
.filter(resourceID::isSameResource)
.findFirst();
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ public Map<String, EventSource> prepareEventSources(

firstDependentResourceConfigMap
.setResourceDiscriminator(
new IndexDiscriminator(CONFIG_MAP_INDEX_1, FIRST_CONFIG_MAP_SUFFIX_1));
new TestIndexDiscriminator(CONFIG_MAP_INDEX_1, FIRST_CONFIG_MAP_SUFFIX_1));
secondDependentResourceConfigMap
.setResourceDiscriminator(
new IndexDiscriminator(CONFIG_MAP_INDEX_2, FIRST_CONFIG_MAP_SUFFIX_2));
new TestIndexDiscriminator(CONFIG_MAP_INDEX_2, FIRST_CONFIG_MAP_SUFFIX_2));
return EventSourceInitializer.nameEventSources(eventSource);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.javaoperatorsdk.operator.sample.indexdiscriminator;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.javaoperatorsdk.operator.api.reconciler.IndexDiscriminator;

import static io.javaoperatorsdk.operator.sample.indexdiscriminator.IndexDiscriminatorTestReconciler.configMapKeyFromPrimary;

public class TestIndexDiscriminator
extends IndexDiscriminator<ConfigMap, IndexDiscriminatorTestCustomResource> {

public TestIndexDiscriminator(String indexName, String nameSuffix) {
super(indexName, p -> configMapKeyFromPrimary(p, nameSuffix));
}
}

0 comments on commit 54a65a6

Please sign in to comment.