Skip to content

Commit

Permalink
Ignore Status updates by default
Browse files Browse the repository at this point in the history
Also added a flag to make it configurable.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Aug 13, 2020
1 parent 0819c54 commit 0dc4af4
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 36 deletions.
52 changes: 31 additions & 21 deletions pkg/syncer/broker/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ type ResourceConfig struct {

// BrokerTransform function used to transform a broker resource to the equivalent local resource.
BrokerTransform syncer.TransformFunc

// ProcessLocalStatusUpdates specifies whether or not updates to the Status field for local resources should be
// processed or ignored.
ProcessLocalStatusUpdates bool

// ProcessBrokerStatusUpdates specifies whether or not updates to the Status field for broker resources should be
// processed or ignored.
ProcessBrokerStatusUpdates bool
}

type SyncerConfig struct {
Expand Down Expand Up @@ -147,17 +155,18 @@ func NewSyncerWithDetail(config *SyncerConfig, localClient, brokerClient dynamic

for _, rc := range config.ResourceConfigs {
localSyncer, err := syncer.NewResourceSyncer(&syncer.ResourceSyncerConfig{
Name: fmt.Sprintf("local -> broker for %T", rc.LocalResourceType),
SourceClient: localClient,
SourceNamespace: rc.LocalSourceNamespace,
LocalClusterID: config.LocalClusterID,
Direction: syncer.LocalToRemote,
RestMapper: restMapper,
Federator: brokerSyncer.remoteFederator,
ResourceType: rc.LocalResourceType,
Transform: rc.LocalTransform,
OnSuccessfulSync: rc.LocalOnSuccessfulSync,
Scheme: config.Scheme,
Name: fmt.Sprintf("local -> broker for %T", rc.LocalResourceType),
SourceClient: localClient,
SourceNamespace: rc.LocalSourceNamespace,
LocalClusterID: config.LocalClusterID,
Direction: syncer.LocalToRemote,
RestMapper: restMapper,
Federator: brokerSyncer.remoteFederator,
ResourceType: rc.LocalResourceType,
Transform: rc.LocalTransform,
OnSuccessfulSync: rc.LocalOnSuccessfulSync,
ProcessStatusUpdates: rc.ProcessLocalStatusUpdates,
Scheme: config.Scheme,
})

if err != nil {
Expand All @@ -167,16 +176,17 @@ func NewSyncerWithDetail(config *SyncerConfig, localClient, brokerClient dynamic
brokerSyncer.syncers = append(brokerSyncer.syncers, localSyncer)

remoteSyncer, err := syncer.NewResourceSyncer(&syncer.ResourceSyncerConfig{
Name: fmt.Sprintf("broker -> local for %T", rc.BrokerResourceType),
SourceClient: brokerClient,
SourceNamespace: config.BrokerNamespace,
LocalClusterID: config.LocalClusterID,
Direction: syncer.RemoteToLocal,
RestMapper: restMapper,
Federator: localFederator,
ResourceType: rc.BrokerResourceType,
Transform: rc.BrokerTransform,
Scheme: config.Scheme,
Name: fmt.Sprintf("broker -> local for %T", rc.BrokerResourceType),
SourceClient: brokerClient,
SourceNamespace: config.BrokerNamespace,
LocalClusterID: config.LocalClusterID,
Direction: syncer.RemoteToLocal,
RestMapper: restMapper,
Federator: localFederator,
ResourceType: rc.BrokerResourceType,
Transform: rc.BrokerTransform,
ProcessStatusUpdates: rc.ProcessBrokerStatusUpdates,
Scheme: config.Scheme,
})

if err != nil {
Expand Down
74 changes: 67 additions & 7 deletions pkg/syncer/broker/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/submariner-io/admiral/pkg/syncer/test"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
)

Expand Down Expand Up @@ -66,11 +67,11 @@ var _ = Describe("Broker Syncer", func() {
It("should correctly sync to the broker datastore", func() {
test.CreateResource(localClient, resource)

test.WaitForResource(brokerClient, resource.GetName())
test.AwaitResource(brokerClient, resource.GetName())
test.VerifyResource(brokerClient, resource, config.BrokerNamespace, config.LocalClusterID)

Expect(localClient.ResourceInterface.Delete(resource.GetName(), nil)).To(Succeed())
test.WaitForNoResource(brokerClient, resource.GetName())
test.AwaitNoResource(brokerClient, resource.GetName())

// Ensure the broker syncer did not try to sync back to the local datastore
localClient.VerifyNoUpdate(resource.GetName())
Expand All @@ -92,11 +93,11 @@ var _ = Describe("Broker Syncer", func() {
test.SetClusterIDLabel(resource, "remote")
test.CreateResource(brokerClient, resource)

test.WaitForResource(localClient, resource.GetName())
test.AwaitResource(localClient, resource.GetName())
test.VerifyResource(localClient, resource, config.LocalNamespace, "remote")

Expect(brokerClient.ResourceInterface.Delete(resource.GetName(), nil)).To(Succeed())
test.WaitForNoResource(localClient, resource.GetName())
test.AwaitNoResource(localClient, resource.GetName())

// Ensure the local syncer did not try to sync back to the broker datastore
brokerClient.VerifyNoUpdate(resource.GetName())
Expand All @@ -122,7 +123,7 @@ var _ = Describe("Broker Syncer", func() {

When("a local resource is created in any namespace", func() {
It("should sync to the broker datastore", func() {
test.WaitForResource(brokerClient, resource.GetName())
test.AwaitResource(brokerClient, resource.GetName())
test.VerifyResource(brokerClient, resource, config.BrokerNamespace, config.LocalClusterID)
})
})
Expand All @@ -140,7 +141,7 @@ var _ = Describe("Broker Syncer", func() {
It("should sync the transformed resource to the broker datastore", func() {
test.CreateResource(localClient, resource)

test.WaitForResource(brokerClient, resource.GetName())
test.AwaitResource(brokerClient, resource.GetName())
test.VerifyResource(brokerClient, transformed, config.BrokerNamespace, config.LocalClusterID)
})
})
Expand All @@ -159,12 +160,71 @@ var _ = Describe("Broker Syncer", func() {
test.SetClusterIDLabel(resource, "remote")
test.CreateResource(brokerClient, resource)

test.WaitForResource(localClient, resource.GetName())
test.AwaitResource(localClient, resource.GetName())
test.VerifyResource(localClient, transformed, config.LocalNamespace, "remote")
})
})
})

When("a local resource's Status is updated in the local datastore", func() {
JustBeforeEach(func() {
test.CreateResource(localClient, resource)
test.AwaitResource(brokerClient, resource.GetName())

resource.Status.Phase = corev1.PodRunning
test.UpdateResource(localClient, resource)
})

When("Status updates are ignored", func() {
It("should not sync to the broker datastore", func() {
brokerClient.VerifyNoUpdate(resource.GetName())
})
})

When("Status updates are not ignored", func() {
BeforeEach(func() {
config.ResourceConfigs[0].ProcessLocalStatusUpdates = true
})

It("should sync to the broker datastore", func() {
test.AwaitAndVerifyResource(brokerClient, resource.GetName(), func(obj *unstructured.Unstructured) bool {
v, _, _ := unstructured.NestedString(obj.Object, "status", "phase")
return corev1.PodPhase(v) == corev1.PodRunning
})
})
})
})

When("a non-local resource's Status is updated in the broker datastore", func() {
JustBeforeEach(func() {
test.SetClusterIDLabel(resource, "remote")
test.CreateResource(brokerClient, resource)
test.AwaitResource(localClient, resource.GetName())

resource.Status.Phase = corev1.PodRunning
test.UpdateResource(brokerClient, resource)
})

When("Status updates are ignored", func() {
It("should not sync to the local datastore", func() {
localClient.VerifyNoUpdate(resource.GetName())
})
})

When("Status updates are not ignored", func() {
BeforeEach(func() {
config.ResourceConfigs[0].ProcessBrokerStatusUpdates = true
})

It("should sync to the local datastore", func() {
test.AwaitAndVerifyResource(localClient, resource.GetName(), func(obj *unstructured.Unstructured) bool {
v, _, _ := unstructured.NestedString(obj.Object, "status", "phase")
return corev1.PodPhase(v) == corev1.PodRunning
})
})
})
})

When("GetBrokerFederatorFor is called", func() {
It("should return the Federator", func() {
Expect(syncer.GetBrokerFederator()).ToNot(BeNil())
Expand Down
11 changes: 9 additions & 2 deletions pkg/syncer/resource_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type ResourceSyncerConfig struct {
// OnSuccessfulSync function invoked after a successful sync operation.
OnSuccessfulSync OnSuccessfulSyncFunc

// ProcessStatusUpdates specifies whether or not updates to the Status field should be processed or ignored.
ProcessStatusUpdates bool

// Scheme used to convert resource objects. By default the global k8s Scheme is used.
Scheme *runtime.Scheme
}
Expand Down Expand Up @@ -329,8 +332,12 @@ func (r *resourceSyncer) onUpdate(oldObj, newObj interface{}) {

equal := reflect.DeepEqual(oldResource.GetLabels(), newResource.GetLabels()) &&
reflect.DeepEqual(oldResource.GetAnnotations(), newResource.GetAnnotations()) &&
equality.Semantic.DeepEqual(r.getSpec(oldResource), r.getSpec(newResource)) &&
equality.Semantic.DeepEqual(r.getStatus(oldResource), r.getStatus(newResource))
equality.Semantic.DeepEqual(r.getSpec(oldResource), r.getSpec(newResource))

if equal && r.config.ProcessStatusUpdates {
equal = equality.Semantic.DeepEqual(r.getStatus(oldResource), r.getStatus(newResource))
}

if equal {
klog.V(log.TRACE).Infof("Syncer %q: objects equivalent on update - not queueing resource\nOLD: %#v\nNEW: %#v",
r.config.Name, oldResource, newResource)
Expand Down
16 changes: 14 additions & 2 deletions pkg/syncer/resource_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,20 @@ func testUpdateSuppression() {
d.resource.Status.Phase = corev1.PodRunning
})

It("should distribute it", func() {
d.federator.VerifyDistribute(test.ToUnstructured(d.resource))
When("Status updates are ignored", func() {
It("should not distribute it", func() {
d.federator.VerifyNoDistribute()
})
})

When("Status updates are not ignored", func() {
BeforeEach(func() {
d.config.ProcessStatusUpdates = true
})

It("should distribute it", func() {
d.federator.VerifyDistribute(test.ToUnstructured(d.resource))
})
})
})

Expand Down
17 changes: 13 additions & 4 deletions pkg/syncer/test/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,12 @@ func SetClusterIDLabel(obj runtime.Object, clusterID string) runtime.Object {
return obj
}

func WaitForResource(client dynamic.ResourceInterface, name string) *unstructured.Unstructured {
func AwaitResource(client dynamic.ResourceInterface, name string) *unstructured.Unstructured {
return AwaitAndVerifyResource(client, name, nil)
}

func AwaitAndVerifyResource(client dynamic.ResourceInterface, name string,
verify func(*unstructured.Unstructured) bool) *unstructured.Unstructured {
var found *unstructured.Unstructured

err := wait.PollImmediate(50*time.Millisecond, 5*time.Second, func() (bool, error) {
Expand All @@ -210,16 +215,20 @@ func WaitForResource(client dynamic.ResourceInterface, name string) *unstructure
return false, err
}

found = obj
return true, nil
if verify == nil || verify(obj) {
found = obj
return true, nil
}

return false, nil
})

Expect(err).To(Succeed())

return found
}

func WaitForNoResource(client dynamic.ResourceInterface, name string) {
func AwaitNoResource(client dynamic.ResourceInterface, name string) {
err := wait.PollImmediate(50*time.Millisecond, 5*time.Second, func() (bool, error) {
_, err := client.Get(name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
Expand Down

0 comments on commit 0dc4af4

Please sign in to comment.