Skip to content

Commit

Permalink
Merge pull request #117 from tpantelis/per_resource_ns
Browse files Browse the repository at this point in the history
Allow per-resource namespace in the Federator
  • Loading branch information
vthapar authored Sep 3, 2020
2 parents 271c23f + ebe014b commit a69b58d
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 30 deletions.
19 changes: 12 additions & 7 deletions pkg/syncer/broker/federator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/submariner-io/admiral/pkg/federate"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/util"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -15,18 +16,18 @@ import (
type federator struct {
dynClient dynamic.Interface
restMapper meta.RESTMapper
brokerNamespace string
targetNamespace string
localClusterID string
}

var keepMetadataFields = map[string]bool{"name": true, "namespace": true, util.LabelsField: true, "annotations": true}

func NewFederator(dynClient dynamic.Interface, restMapper meta.RESTMapper, brokerNamespace,
func NewFederator(dynClient dynamic.Interface, restMapper meta.RESTMapper, targetNamespace,
localClusterID string) federate.Federator {
return &federator{
dynClient: dynClient,
restMapper: restMapper,
brokerNamespace: brokerNamespace,
targetNamespace: targetNamespace,
localClusterID: localClusterID,
}
}
Expand All @@ -44,7 +45,6 @@ func (f *federator) Distribute(resource runtime.Object) error {
}

f.prepareResourceForSync(toDistribute)
toDistribute.SetNamespace(f.brokerNamespace)

_, err = util.CreateOrUpdate(resourceClient, toDistribute, func(existing *unstructured.Unstructured) (*unstructured.Unstructured, error) {
// Preserve the existing metadata info (except Labels and Annotations), specifically the ResourceVersion which must
Expand Down Expand Up @@ -75,7 +75,14 @@ func (f *federator) toUnstructured(from runtime.Object) (*unstructured.Unstructu
return nil, nil, err
}

return to, f.dynClient.Resource(*gvr).Namespace(f.brokerNamespace), nil
ns := f.targetNamespace
if ns == corev1.NamespaceAll {
ns = to.GetNamespace()
}

to.SetNamespace(ns)

return to, f.dynClient.Resource(*gvr).Namespace(ns), nil
}

func (f *federator) prepareResourceForSync(resource *unstructured.Unstructured) {
Expand All @@ -86,8 +93,6 @@ func (f *federator) prepareResourceForSync(resource *unstructured.Unstructured)
unstructured.RemoveNestedField(resource.Object, util.MetadataField, field)
}
}

resource.SetNamespace(f.brokerNamespace)
}

func setNestedField(to map[string]interface{}, value interface{}, fields ...string) {
Expand Down
85 changes: 62 additions & 23 deletions pkg/syncer/broker/federator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,32 @@ var _ = Describe("Federator", func() {

func testDistribute() {
var (
f federate.Federator
resource *corev1.Pod
localClusterID string
resourceClient *fake.DynamicResourceClient
initObjs []runtime.Object
f federate.Federator
resource *corev1.Pod
localClusterID string
federatorNamespace string
targetNamespace string
resourceClient *fake.DynamicResourceClient
initObjs []runtime.Object
)

BeforeEach(func() {
localClusterID = "east"
resource = test.NewPod(test.LocalNamespace)
initObjs = nil
federatorNamespace = test.RemoteNamespace
targetNamespace = test.RemoteNamespace
})

JustBeforeEach(func() {
f, resourceClient = setupFederator(resource, initObjs, localClusterID)
f, resourceClient = setupFederator(resource, initObjs, localClusterID, federatorNamespace, targetNamespace)
})

When("the resource does not already exist in the broker datastore", func() {
When("a local cluster ID is specified", func() {
It("should create the resource with the cluster ID label", func() {
Expect(f.Distribute(resource)).To(Succeed())
test.VerifyResource(resourceClient, resource, test.RemoteNamespace, localClusterID)
test.VerifyResource(resourceClient, resource, targetNamespace, localClusterID)
})
})

Expand All @@ -54,7 +58,7 @@ func testDistribute() {

It("should create the resource without the cluster ID label", func() {
Expect(f.Distribute(resource)).To(Succeed())
test.VerifyResource(resourceClient, resource, test.RemoteNamespace, localClusterID)
test.VerifyResource(resourceClient, resource, targetNamespace, localClusterID)
})
})

Expand All @@ -68,7 +72,7 @@ func testDistribute() {

It("should create the resource with the Status data", func() {
Expect(f.Distribute(resource)).To(Succeed())
test.VerifyResource(resourceClient, resource, test.RemoteNamespace, localClusterID)
test.VerifyResource(resourceClient, resource, targetNamespace, localClusterID)
})
})

Expand All @@ -95,21 +99,23 @@ func testDistribute() {

It("should update the resource", func() {
Expect(f.Distribute(resource)).To(Succeed())
test.VerifyResource(resourceClient, resource, test.RemoteNamespace, localClusterID)
test.VerifyResource(resourceClient, resource, targetNamespace, localClusterID)
})
})
})

When("the resource already exists in the broker datastore", func() {
BeforeEach(func() {
initObjs = append(initObjs, resource)
existing := resource.DeepCopy()
existing.SetNamespace(targetNamespace)
initObjs = append(initObjs, existing)

resource = test.NewPodWithImage(test.LocalNamespace, "apache")
})

It("should update the resource", func() {
Expect(f.Distribute(resource)).To(Succeed())
test.VerifyResource(resourceClient, resource, test.RemoteNamespace, localClusterID)
test.VerifyResource(resourceClient, resource, targetNamespace, localClusterID)
})

When("update initially fails due to conflict", func() {
Expand All @@ -119,7 +125,7 @@ func testDistribute() {

It("should retry until it succeeds", func() {
Expect(f.Distribute(resource)).To(Succeed())
test.VerifyResource(resourceClient, resource, test.RemoteNamespace, localClusterID)
test.VerifyResource(resourceClient, resource, targetNamespace, localClusterID)
})
})

Expand All @@ -143,29 +149,46 @@ func testDistribute() {
Expect(f.Distribute(resource)).ToNot(Succeed())
})
})

When("no target namespace is specified", func() {
BeforeEach(func() {
targetNamespace = "another-ns"
federatorNamespace = corev1.NamespaceAll
resource.SetNamespace(targetNamespace)
})

It("should create the resource in the source namespace", func() {
Expect(f.Distribute(resource)).To(Succeed())
test.VerifyResource(resourceClient, resource, targetNamespace, localClusterID)
})
})
}

func testDelete() {
var (
f federate.Federator
resource *corev1.Pod
resourceClient *fake.DynamicResourceClient
initObjs []runtime.Object
f federate.Federator
resource *corev1.Pod
resourceClient *fake.DynamicResourceClient
initObjs []runtime.Object
targetNamespace string
federatorNamespace string
)

BeforeEach(func() {
resource = test.NewPod(test.LocalNamespace)
initObjs = nil
targetNamespace = test.RemoteNamespace
federatorNamespace = test.RemoteNamespace
})

JustBeforeEach(func() {
f, resourceClient = setupFederator(resource, initObjs, "")
f, resourceClient = setupFederator(resource, initObjs, "", federatorNamespace, targetNamespace)
})

When("the resource exists in the broker datastore", func() {
BeforeEach(func() {
existing := resource.DeepCopy()
existing.SetNamespace(test.RemoteNamespace)
existing.SetNamespace(targetNamespace)
initObjs = append(initObjs, existing)
})

Expand All @@ -185,6 +208,22 @@ func testDelete() {
Expect(f.Delete(resource)).ToNot(Succeed())
})
})

When("no target namespace is specified", func() {
BeforeEach(func() {
targetNamespace = "another-ns"
federatorNamespace = corev1.NamespaceAll
initObjs[0].(*corev1.Pod).SetNamespace(targetNamespace)
resource.SetNamespace(targetNamespace)
})

It("should delete the resource from the source namespace", func() {
Expect(f.Delete(resource)).To(Succeed())

_, err := test.GetResourceAndError(resourceClient, resource)
Expect(apierrors.IsNotFound(err)).To(BeTrue())
})
})
})

When("the resource does not exist in the broker datastore", func() {
Expand All @@ -194,11 +233,11 @@ func testDelete() {
})
}

func setupFederator(resource *corev1.Pod, initObjs []runtime.Object, localClusterID string) (federate.Federator,
func setupFederator(resource *corev1.Pod, initObjs []runtime.Object, localClusterID, federatorNS, targetNS string) (federate.Federator,
*fake.DynamicResourceClient) {
dynClient := fake.NewDynamicClient(test.PrepInitialClientObjs(test.RemoteNamespace, localClusterID, initObjs...)...)
dynClient := fake.NewDynamicClient(test.PrepInitialClientObjs("", localClusterID, initObjs...)...)
restMapper, gvr := test.GetRESTMapperAndGroupVersionResourceFor(resource)
f := broker.NewFederator(dynClient, restMapper, test.RemoteNamespace, localClusterID)
f := broker.NewFederator(dynClient, restMapper, federatorNS, localClusterID)

return f, dynClient.Resource(*gvr).Namespace(test.RemoteNamespace).(*fake.DynamicResourceClient)
return f, dynClient.Resource(*gvr).Namespace(targetNS).(*fake.DynamicResourceClient)
}

0 comments on commit a69b58d

Please sign in to comment.