From 6c4b1d07ab64dacb4e65cb7c70a13f886cc5b407 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Tue, 8 Aug 2023 12:30:52 -0400 Subject: [PATCH] draft of changes to simplify the recording mechanism --- .../dependent/RecentOperationEventFilter.java | 11 -- .../KubernetesDependentResource.java | 44 ++---- ...BasedGenericKubernetesResourceMatcher.java | 9 ++ .../event/source/informer/EventRecorder.java | 72 ---------- .../source/informer/InformerEventSource.java | 131 ++++++------------ .../source/informer/EventRecorderTest.java | 81 ----------- .../informer/InformerEventSourceTest.java | 109 --------------- ...CreateUpdateEventFilterTestReconciler.java | 45 ++---- 8 files changed, 77 insertions(+), 425 deletions(-) delete mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/RecentOperationEventFilter.java delete mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java delete mode 100644 operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorderTest.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/RecentOperationEventFilter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/RecentOperationEventFilter.java deleted file mode 100644 index d48343e57c..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/RecentOperationEventFilter.java +++ /dev/null @@ -1,11 +0,0 @@ -package io.javaoperatorsdk.operator.api.reconciler.dependent; - -import io.javaoperatorsdk.operator.processing.event.ResourceID; - -public interface RecentOperationEventFilter extends RecentOperationCacheFiller { - - void prepareForCreateOrUpdateEventFiltering(ResourceID resourceID, R resource); - - void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID); - -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java index d03d96dee3..b31f3c56c6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java @@ -37,6 +37,8 @@ public abstract class KubernetesDependentResource> { + public static String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous"; + private static final Logger log = LoggerFactory.getLogger(KubernetesDependentResource.class); protected KubernetesClient client; @@ -103,35 +105,14 @@ public void configureWith(InformerEventSource informerEventSource) { setEventSource(informerEventSource); } - - protected R handleCreate(R desired, P primary, Context

context) { - ResourceID resourceID = ResourceID.fromResource(desired); - try { - prepareEventFiltering(desired, resourceID); - return super.handleCreate(desired, primary, context); - } catch (RuntimeException e) { - cleanupAfterEventFiltering(resourceID); - throw e; - } - } - - protected R handleUpdate(R actual, R desired, P primary, Context

context) { - ResourceID resourceID = ResourceID.fromResource(desired); - try { - prepareEventFiltering(desired, resourceID); - return super.handleUpdate(actual, desired, primary, context); - } catch (RuntimeException e) { - cleanupAfterEventFiltering(resourceID); - throw e; - } - } - @SuppressWarnings("unused") public R create(R target, P primary, Context

context) { if (useSSA(context)) { // setting resource version for SSA so only created if it doesn't exist already target.getMetadata().setResourceVersion("1"); } + String id = ((InformerEventSource) eventSource().orElseThrow()).getId(); + target.getMetadata().getAnnotations().put(PREVIOUS_ANNOTATION_KEY, id); final var resource = prepare(target, primary, "Creating"); return useSSA(context) ? resource @@ -147,6 +128,9 @@ public R update(R actual, R target, P primary, Context

context) { actual.getMetadata().getResourceVersion()); } R updatedResource; + String id = ((InformerEventSource) eventSource().orElseThrow()).getId(); + target.getMetadata().getAnnotations().put(PREVIOUS_ANNOTATION_KEY, + id + "," + actual.getMetadata().getResourceVersion()); if (useSSA(context)) { target.getMetadata().setResourceVersion(actual.getMetadata().getResourceVersion()); updatedResource = prepare(target, primary, "Updating") @@ -154,13 +138,14 @@ public R update(R actual, R target, P primary, Context

context) { .forceConflicts().serverSideApply(); } else { var updatedActual = updaterMatcher.updateResource(actual, target, context); - updatedResource = prepare(updatedActual, primary, "Updating").replace(); + updatedResource = prepare(updatedActual, primary, "Updating").update(); } log.debug("Resource version after update: {}", updatedResource.getMetadata().getResourceVersion()); return updatedResource; } + @Override public Result match(R actualResource, P primary, Context

context) { final var desired = desired(primary, context); final boolean matches; @@ -193,6 +178,7 @@ private boolean useSSA(Context

context) { .ssaBasedCreateUpdateMatchForDependentResources(); } + @Override protected void handleDelete(P primary, R secondary, Context

context) { if (secondary != null) { client.resource(secondary).delete(); @@ -290,16 +276,6 @@ protected R desired(P primary, Context

context) { return super.desired(primary, context); } - private void prepareEventFiltering(R desired, ResourceID resourceID) { - ((InformerEventSource) eventSource().orElseThrow()) - .prepareForCreateOrUpdateEventFiltering(resourceID, desired); - } - - private void cleanupAfterEventFiltering(ResourceID resourceID) { - ((InformerEventSource) eventSource().orElseThrow()) - .cleanupOnCreateOrUpdateEventFiltering(resourceID); - } - @Override public Optional> configuration() { return Optional.ofNullable(kubernetesDependentResourceConfig); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/SSABasedGenericKubernetesResourceMatcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/SSABasedGenericKubernetesResourceMatcher.java index f4718b45c3..b7622eac77 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/SSABasedGenericKubernetesResourceMatcher.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/SSABasedGenericKubernetesResourceMatcher.java @@ -90,6 +90,15 @@ public boolean matches(R actual, R desired, Context context) { keepOnlyManagedFields(prunedActual, actualMap, managedFieldsEntry.getFieldsV1().getAdditionalProperties(), objectMapper); + ((Map) prunedActual.get(METADATA_KEY)).computeIfPresent("annotations", (k, v) -> { + var annotations = (Map)v; + annotations.remove(KubernetesDependentResource.PREVIOUS_ANNOTATION_KEY); + if (annotations.isEmpty()) { + return null; + } + return annotations; + }); + removeIrrelevantValues(desiredMap); if (LoggingUtils.isNotSensitiveResource(desired)) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java deleted file mode 100644 index 5d23d870aa..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java +++ /dev/null @@ -1,72 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event.source.informer; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.processing.event.ResourceID; - -public class EventRecorder { - - private final Map> resourceEvents = new HashMap<>(); - - public void startEventRecording(ResourceID resourceID) { - resourceEvents.putIfAbsent(resourceID, new ArrayList<>(5)); - } - - public boolean isRecordingFor(ResourceID resourceID) { - return resourceEvents.get(resourceID) != null; - } - - public void stopEventRecording(ResourceID resourceID) { - resourceEvents.remove(resourceID); - } - - public void recordEvent(R resource) { - resourceEvents.get(ResourceID.fromResource(resource)).add(resource); - } - - public boolean containsEventWithResourceVersion(ResourceID resourceID, - String resourceVersion) { - List events = resourceEvents.get(resourceID); - if (events == null) { - return false; - } - if (events.isEmpty()) { - return false; - } else { - return events.stream() - .anyMatch(e -> e.getMetadata().getResourceVersion().equals(resourceVersion)); - } - } - - public boolean containsEventWithVersionButItsNotLastOne( - ResourceID resourceID, String resourceVersion) { - List resources = resourceEvents.get(resourceID); - if (resources == null) { - throw new IllegalStateException( - "Null events list, this is probably a result of invalid usage of the " + - "InformerEventSource. Resource ID: " + resourceID); - } - if (resources.isEmpty()) { - throw new IllegalStateException("No events for resource id: " + resourceID); - } - return !resources - .get(resources.size() - 1) - .getMetadata() - .getResourceVersion() - .equals(resourceVersion); - } - - public R getLastEvent(ResourceID resourceID) { - List resources = resourceEvents.get(resourceID); - if (resources == null) { - throw new IllegalStateException( - "Null events list, this is probably a result of invalid usage of the " + - "InformerEventSource. Resource ID: " + resourceID); - } - return resources.get(resources.size() - 1); - } -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 8cca524464..53ac06a68a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -5,6 +5,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; @@ -18,7 +19,7 @@ import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationEventFilter; +import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -72,17 +73,16 @@ */ public class InformerEventSource extends ManagedInformerEventSource> - implements ResourceEventHandler, RecentOperationEventFilter { + implements ResourceEventHandler { private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); private final InformerConfiguration configuration; - // always called from a synchronized method - private final EventRecorder eventRecorder = new EventRecorder<>(); // we need direct control for the indexer to propagate the just update resource also to the index private final PrimaryToSecondaryIndex primaryToSecondaryIndex; private final PrimaryToSecondaryMapper

primaryToSecondaryMapper; private Map>> indexerBuffer = new HashMap<>(); + private String id = UUID.randomUUID().toString(); public InformerEventSource( InformerConfiguration configuration, EventSourceContext

context) { @@ -110,6 +110,10 @@ public InformerEventSource(InformerConfiguration configuration, KubernetesCli genericFilter = configuration.genericFilter().orElse(null); } + public String getId() { + return id; + } + @Override public void onAdd(R newResource) { if (log.isDebugEnabled()) { @@ -154,12 +158,23 @@ public void onDelete(R resource, boolean b) { private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldObject, Runnable superOnOp) { var resourceID = ResourceID.fromResource(newObject); - if (eventRecorder.isRecordingFor(resourceID)) { - log.debug("Recording event for: {}", resourceID); - eventRecorder.recordEvent(newObject); - return; + + String previous = newObject.getMetadata().getAnnotations() + .get(KubernetesDependentResource.PREVIOUS_ANNOTATION_KEY); + boolean known = false; + if (previous != null) { + String[] parts = previous.split(","); + if (id.equals(parts[0])) { + if (oldObject == null && parts.length == 1) { + known = true; + } else if (oldObject != null && parts.length == 2 + && oldObject.getMetadata().getResourceVersion().equals(parts[1])) { + known = true; + } + } } - if (temporaryCacheHasResourceWithSameVersionAs(newObject)) { + if (temporaryCacheHasResourceWithSameVersionAs(newObject) + || (known && temporaryResourceCache.getResourceFromCache(resourceID).isEmpty())) { log.debug( "Skipping event propagation for {}, since was a result of a reconcile action. Resource ID: {}", operation, @@ -239,73 +254,30 @@ public InformerConfiguration getConfiguration() { @Override public synchronized void handleRecentResourceUpdate(ResourceID resourceID, R resource, R previousVersionOfResource) { - handleRecentCreateOrUpdate(Operation.UPDATE, resource, previousVersionOfResource, - () -> super.handleRecentResourceUpdate(resourceID, resource, previousVersionOfResource)); + handleRecentCreateOrUpdate(Operation.UPDATE, resource, previousVersionOfResource); } @Override public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) { - handleRecentCreateOrUpdate(Operation.ADD, resource, null, - () -> super.handleRecentResourceCreate(resourceID, resource)); + handleRecentCreateOrUpdate(Operation.ADD, resource, null); } - private void handleRecentCreateOrUpdate(Operation operation, R resource, R oldResource, - Runnable runnable) { - primaryToSecondaryIndex.onAddOrUpdate(resource); - if (eventRecorder.isRecordingFor(ResourceID.fromResource(resource))) { - handleRecentResourceOperationAndStopEventRecording(operation, resource, oldResource); - } else { - runnable.run(); - } - } - - /** - * There can be the following cases: - *

    - *
  • 1. Did not receive the event yet for the target resource, then we need to put it to temp - * cache. Because event will arrive. Note that this not necessary mean that the even is not sent - * yet (we are in sync context). Also does not mean that there are no more events received after - * that. But during the event processing (onAdd, onUpdate) we make sure that the propagation just - * skipped for the right event.
  • - *
  • 2. Received the event about the operation already, it was the last. This means already is - * on cache of informer. So we have to do nothing. Since it was just recorded and not propagated. - *
  • - *
  • 3. Received the event but more events received since, so those were not propagated yet. So - * an event needs to be propagated to compensate.
  • - *
- * - * @param newResource just created or updated resource - */ - private void handleRecentResourceOperationAndStopEventRecording(Operation operation, - R newResource, R oldResource) { + private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) { + primaryToSecondaryIndex.onAddOrUpdate(newResource); ResourceID resourceID = ResourceID.fromResource(newResource); - try { - if (!eventRecorder.containsEventWithResourceVersion( - resourceID, newResource.getMetadata().getResourceVersion())) { - log.debug( - "Did not found event in buffer with target version and resource id: {}", resourceID); - temporaryResourceCache.unconditionallyCacheResource(newResource); - } else { - // if the resource is not added to the temp cache, it is cleared, since - // the cache is cleared by subsequent events after updates, but if those did not receive - // the temp cache is still filled at this point with an old resource - log.debug("Cleaning temporary cache for resource id: {}", resourceID); - temporaryResourceCache.removeResourceFromCache(newResource); - if (eventRecorder.containsEventWithVersionButItsNotLastOne( - resourceID, newResource.getMetadata().getResourceVersion())) { - R lastEvent = eventRecorder.getLastEvent(resourceID); - - log.debug( - "Found events in event buffer but the target event is not last for id: {}. Propagating event.", - resourceID); - if (eventAcceptedByFilter(operation, newResource, oldResource)) { - propagateEvent(lastEvent); - } - } - } - } finally { - log.debug("Stopping event recording for: {}", resourceID); - eventRecorder.stopEventRecording(resourceID); + R cachedResource = get(resourceID).orElse(null); + if ((oldResource == null && cachedResource == null) || (cachedResource != null && oldResource != null + && cachedResource.getMetadata().getResourceVersion().equals(oldResource.getMetadata().getResourceVersion()))) { + log.debug( + "Temporarily moving ahead to target version {} for resource id: {}", + newResource.getMetadata().getResourceVersion(), resourceID); + temporaryResourceCache.unconditionallyCacheResource(newResource); + } else { + // if the resource is not added to the temp cache, it is cleared, since + // the cache is cleared by subsequent events after updates, but if those did not receive + // the temp cache is still filled at this point with an old resource + log.debug("Cleaning temporary cache for resource id: {}", resourceID); + temporaryResourceCache.removeResourceFromCache(newResource); } } @@ -313,25 +285,6 @@ private boolean useSecondaryToPrimaryIndex() { return this.primaryToSecondaryMapper == null; } - @Override - public synchronized void prepareForCreateOrUpdateEventFiltering(ResourceID resourceID, - R resource) { - log.debug("Starting event recording for: {}", resourceID); - eventRecorder.startEventRecording(resourceID); - } - - /** - * Mean to be called to clean up in case of an exception from the client. Usually in a catch - * block. - * - * @param resourceID to cleanup - */ - @Override - public synchronized void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID) { - log.debug("Stopping event recording for: {}", resourceID); - eventRecorder.stopEventRecording(resourceID); - } - @Override public boolean allowsNamespaceChanges() { return getConfiguration().followControllerNamespaceChanges(); @@ -361,6 +314,7 @@ private boolean acceptedByDeleteFilters(R resource, boolean b) { // Since this event source instance is created by the user, the ConfigurationService is actually // injected after it is registered. Some of the subcomponents are initialized at that time here. + @Override public void setConfigurationService(ConfigurationService configurationService) { super.setConfigurationService(configurationService); @@ -368,6 +322,7 @@ public void setConfigurationService(ConfigurationService configurationService) { indexerBuffer = null; } + @Override public void addIndexers(Map>> indexers) { if (indexerBuffer == null) { throw new OperatorException("Cannot add indexers after InformerEventSource started."); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorderTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorderTest.java deleted file mode 100644 index 556ad089ff..0000000000 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorderTest.java +++ /dev/null @@ -1,81 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event.source.informer; - -import org.junit.jupiter.api.Test; - -import io.fabric8.kubernetes.api.model.ConfigMap; -import io.fabric8.kubernetes.api.model.ObjectMeta; -import io.javaoperatorsdk.operator.processing.event.ResourceID; - -import static org.assertj.core.api.Assertions.assertThat; - -class EventRecorderTest { - - public static final String RESOURCE_VERSION = "0"; - public static final String RESOURCE_VERSION1 = "1"; - - EventRecorder eventRecorder = new EventRecorder<>(); - - ConfigMap testConfigMap = testConfigMap(RESOURCE_VERSION); - ConfigMap testConfigMap2 = testConfigMap(RESOURCE_VERSION1); - - ResourceID id = ResourceID.fromResource(testConfigMap); - - @Test - void recordsEvents() { - - assertThat(eventRecorder.isRecordingFor(id)).isFalse(); - - eventRecorder.startEventRecording(id); - assertThat(eventRecorder.isRecordingFor(id)).isTrue(); - - eventRecorder.recordEvent(testConfigMap); - - eventRecorder.stopEventRecording(id); - assertThat(eventRecorder.isRecordingFor(id)).isFalse(); - } - - @Test - void getsLastRecorded() { - eventRecorder.startEventRecording(id); - - eventRecorder.recordEvent(testConfigMap); - eventRecorder.recordEvent(testConfigMap2); - - assertThat(eventRecorder.getLastEvent(id)).isEqualTo(testConfigMap2); - } - - @Test - void checksContainsWithResourceVersion() { - eventRecorder.startEventRecording(id); - - eventRecorder.recordEvent(testConfigMap); - eventRecorder.recordEvent(testConfigMap2); - - assertThat(eventRecorder.containsEventWithResourceVersion(id, RESOURCE_VERSION)).isTrue(); - assertThat(eventRecorder.containsEventWithResourceVersion(id, RESOURCE_VERSION1)).isTrue(); - assertThat(eventRecorder.containsEventWithResourceVersion(id, "xxx")).isFalse(); - } - - @Test - void checkLastItemVersion() { - eventRecorder.startEventRecording(id); - - eventRecorder.recordEvent(testConfigMap); - eventRecorder.recordEvent(testConfigMap2); - - assertThat(eventRecorder.containsEventWithVersionButItsNotLastOne(id, RESOURCE_VERSION)) - .isTrue(); - assertThat(eventRecorder.containsEventWithVersionButItsNotLastOne(id, RESOURCE_VERSION1)) - .isFalse(); - } - - ConfigMap testConfigMap(String resourceVersion) { - ConfigMap configMap = new ConfigMap(); - configMap.setMetadata(new ObjectMeta()); - configMap.getMetadata().setName("test"); - configMap.getMetadata().setResourceVersion(resourceVersion); - - return configMap; - } - -} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 69e26f0b35..336de37c68 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -91,115 +91,6 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() { verify(temporaryResourceCacheMock, times(1)).removeResourceFromCache(any()); } - @Test - void notPropagatesEventIfAfterUpdateReceivedJustTheRelatedEvent() { - var testDeployment = testDeployment(); - var prevTestDeployment = testDeployment(); - prevTestDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); - - - informerEventSource - .prepareForCreateOrUpdateEventFiltering(ResourceID.fromResource(testDeployment), - testDeployment); - informerEventSource.onUpdate(prevTestDeployment, testDeployment); - informerEventSource.handleRecentResourceUpdate(ResourceID.fromResource(testDeployment), - testDeployment, prevTestDeployment); - - verify(eventHandlerMock, times(0)).handleEvent(any()); - verify(temporaryResourceCacheMock, times(0)).unconditionallyCacheResource(any()); - } - - - @Test - void notPropagatesEventIfAfterCreateReceivedJustTheRelatedEvent() { - var testDeployment = testDeployment(); - - informerEventSource - .prepareForCreateOrUpdateEventFiltering(ResourceID.fromResource(testDeployment), - testDeployment); - informerEventSource.onAdd(testDeployment); - informerEventSource.handleRecentResourceCreate(ResourceID.fromResource(testDeployment), - testDeployment); - - verify(eventHandlerMock, times(0)).handleEvent(any()); - verify(temporaryResourceCacheMock, times(0)).unconditionallyCacheResource(any()); - } - - @Test - void propagatesEventIfNewEventReceivedAfterTheCurrentTargetEvent() { - var testDeployment = testDeployment(); - var prevTestDeployment = testDeployment(); - prevTestDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); - var nextTestDeployment = testDeployment(); - nextTestDeployment.getMetadata().setResourceVersion(NEXT_RESOURCE_VERSION); - - informerEventSource - .prepareForCreateOrUpdateEventFiltering(ResourceID.fromResource(testDeployment), - testDeployment); - informerEventSource.onUpdate(prevTestDeployment, testDeployment); - informerEventSource.onUpdate(testDeployment, nextTestDeployment); - informerEventSource.handleRecentResourceUpdate(ResourceID.fromResource(testDeployment), - testDeployment, prevTestDeployment); - - verify(eventHandlerMock, times(1)).handleEvent(any()); - verify(temporaryResourceCacheMock, times(0)).unconditionallyCacheResource(any()); - } - - @Test - void notPropagatesEventIfMoreReceivedButTheLastIsTheUpdated() { - var testDeployment = testDeployment(); - var prevTestDeployment = testDeployment(); - prevTestDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); - var prevPrevTestDeployment = testDeployment(); - prevPrevTestDeployment.getMetadata().setResourceVersion("-1"); - - informerEventSource - .prepareForCreateOrUpdateEventFiltering(ResourceID.fromResource(testDeployment), - testDeployment); - informerEventSource.onUpdate(prevPrevTestDeployment, prevTestDeployment); - informerEventSource.onUpdate(prevTestDeployment, testDeployment); - informerEventSource.handleRecentResourceUpdate(ResourceID.fromResource(testDeployment), - testDeployment, prevTestDeployment); - - verify(eventHandlerMock, times(0)).handleEvent(any()); - verify(temporaryResourceCacheMock, times(0)).unconditionallyCacheResource(any()); - } - - @Test - void putsResourceOnTempCacheIfNoEventRecorded() { - var testDeployment = testDeployment(); - var prevTestDeployment = testDeployment(); - prevTestDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); - - informerEventSource - .prepareForCreateOrUpdateEventFiltering(ResourceID.fromResource(testDeployment), - testDeployment); - informerEventSource.handleRecentResourceUpdate(ResourceID.fromResource(testDeployment), - testDeployment, prevTestDeployment); - - verify(eventHandlerMock, times(0)).handleEvent(any()); - verify(temporaryResourceCacheMock, times(1)).unconditionallyCacheResource(any()); - } - - @Test - void putsResourceOnTempCacheIfNoEventRecordedWithSameResourceVersion() { - var testDeployment = testDeployment(); - var prevTestDeployment = testDeployment(); - prevTestDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); - var prevPrevTestDeployment = testDeployment(); - prevPrevTestDeployment.getMetadata().setResourceVersion("-1"); - - informerEventSource - .prepareForCreateOrUpdateEventFiltering(ResourceID.fromResource(testDeployment), - testDeployment); - informerEventSource.onUpdate(prevPrevTestDeployment, prevTestDeployment); - informerEventSource.handleRecentResourceUpdate(ResourceID.fromResource(testDeployment), - testDeployment, prevTestDeployment); - - verify(eventHandlerMock, times(0)).handleEvent(any()); - verify(temporaryResourceCacheMock, times(1)).unconditionallyCacheResource(any()); - } - @Test void genericFilterForEvents() { informerEventSource.setGenericFilter(r -> false); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestReconciler.java index ef2ddfc4ec..bc82c487f6 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestReconciler.java @@ -46,41 +46,26 @@ public UpdateControl reconcile( if (configMap == null) { var configMapToCreate = createConfigMap(resource); final var resourceID = ResourceID.fromResource(configMapToCreate); - try { - informerEventSource.prepareForCreateOrUpdateEventFiltering(resourceID, configMapToCreate); - configMap = - client - .configMaps() - .inNamespace(resource.getMetadata().getNamespace()) - .resource(configMapToCreate) - .create(); - informerEventSource.handleRecentResourceCreate(resourceID, configMap); - } catch (RuntimeException e) { - informerEventSource - .cleanupOnCreateOrUpdateEventFiltering(resourceID); - throw e; - } + configMap = + client + .configMaps() + .inNamespace(resource.getMetadata().getNamespace()) + .resource(configMapToCreate) + .create(); + informerEventSource.handleRecentResourceCreate(resourceID, configMap); } else { ResourceID resourceID = ResourceID.fromResource(configMap); if (!Objects.equals( configMap.getData().get(CONFIG_MAP_TEST_DATA_KEY), resource.getSpec().getValue())) { configMap.getData().put(CONFIG_MAP_TEST_DATA_KEY, resource.getSpec().getValue()); - try { - informerEventSource - .prepareForCreateOrUpdateEventFiltering(resourceID, configMap); - var newConfigMap = - client - .configMaps() - .inNamespace(resource.getMetadata().getNamespace()) - .resource(configMap) - .replace(); - informerEventSource.handleRecentResourceUpdate(resourceID, - newConfigMap, configMap); - } catch (RuntimeException e) { - informerEventSource - .cleanupOnCreateOrUpdateEventFiltering(resourceID); - throw e; - } + var newConfigMap = + client + .configMaps() + .inNamespace(resource.getMetadata().getNamespace()) + .resource(configMap) + .replace(); + informerEventSource.handleRecentResourceUpdate(resourceID, + newConfigMap, configMap); } } return UpdateControl.noUpdate();