diff --git a/transfer/pvc.go b/transfer/pvc.go new file mode 100644 index 0000000..5a77fc3 --- /dev/null +++ b/transfer/pvc.go @@ -0,0 +1,34 @@ +package transfer + +import ( + corev1 "k8s.io/api/core/v1" +) + +// PVC knows how to return v1.PersistentVolumeClaim and an additional validated +// name which can be used by different transfers as per their own requirements +type PVC interface { + // Claim returns the v1.PersistentVolumeClaim reference this PVC is associated with + Claim() *corev1.PersistentVolumeClaim + // LabelSafeName returns a name for the PVC that can be used as a label value + // it may be validated differently by different transfers + LabelSafeName() string +} + +type PVCList interface { + GetNamespaces() []string + InNamespace(ns string) PVCList + PVCs() []PVC +} + +// GetNames takes PVCList and returns a map with a unique md5 hash for each namespace +// based on the members in PVCList for that namespace. +func GetNames(pvcs PVCList) map[string]string { + p := map[string]string{} + for _, pvc := range pvcs.PVCs() { + p[pvc.Claim().Namespace] += pvc.Claim().Name + } + for _, ns := range p { + p[ns] = getMD5Hash(p[ns]) + } + return p +} diff --git a/transfer/pvc_list.go b/transfer/pvc_list.go new file mode 100644 index 0000000..8c944d3 --- /dev/null +++ b/transfer/pvc_list.go @@ -0,0 +1,94 @@ +package transfer + +import ( + "crypto/md5" + "encoding/hex" + + corev1 "k8s.io/api/core/v1" +) + +// pvc represents a PersistentVolumeClaim +type pvc struct { + p *corev1.PersistentVolumeClaim +} + +// Claim returns ref to associated PersistentVolumeClaim +func (p pvc) Claim() *corev1.PersistentVolumeClaim { + return p.p +} + +// LabelSafeName returns a name which is guaranteed to be a safe label value +func (p pvc) LabelSafeName() string { + return getMD5Hash(p.p.Name) +} + +func getMD5Hash(s string) string { + hash := md5.Sum([]byte(s)) + return hex.EncodeToString(hash[:]) +} + +// pvcList defines a managed list of PVCs +type pvcList []PVC + +// NewPVCPairList when given a list of PVCPair, returns a managed list +func NewPVCList(pvcs ...*corev1.PersistentVolumeClaim) (PVCList, error) { + pvcList := pvcList{} + for _, p := range pvcs { + if p != nil { + pvcList = append(pvcList, pvc{p}) + } + // TODO: log an error here pvc list has an invalid entry + } + return pvcList, nil +} + +// GetNamespaces returns all the namespaces present in the list of pvcs +func (p pvcList) GetNamespaces() (namespaces []string) { + nsSet := map[string]bool{} + for i := range p { + pvc := p[i] + if _, exists := nsSet[pvc.Claim().Namespace]; !exists { + nsSet[pvc.Claim().Namespace] = true + namespaces = append(namespaces, pvc.Claim().Namespace) + } + } + return +} + +// InNamespace given a destination namespace, returns a list of pvcs that will be migrated to it +func (p pvcList) InNamespace(ns string) PVCList { + pvcList := pvcList{} + for i := range p { + pvc := p[i] + if pvc.Claim().Namespace == ns { + pvcList = append(pvcList, pvc) + } + } + return pvcList +} + +func (p pvcList) PVCs() []PVC { + pvcs := []PVC{} + for i := range p { + if p[i].Claim() != nil { + pvcs = append(pvcs, p[i]) + } + } + return pvcs +} + +type singletonPVC struct { + pvc *corev1.PersistentVolumeClaim +} + +func (s singletonPVC) Claim() *corev1.PersistentVolumeClaim { + return s.pvc +} + +func (s singletonPVC) LabelSafeName() string { + return "data" +} + +func NewSingletonPVC(pvc *corev1.PersistentVolumeClaim) PVCList { + return pvcList([]PVC{singletonPVC{pvc}}) +} diff --git a/transfer/rsync/rsync.go b/transfer/rsync/rsync.go new file mode 100644 index 0000000..bebdf3b --- /dev/null +++ b/transfer/rsync/rsync.go @@ -0,0 +1,39 @@ +package rsync + +import ( + "github.com/backube/pvc-transfer/transfer" + corev1 "k8s.io/api/core/v1" +) + +const ( + RsyncContainer = "rsync" +) + +const ( + rsyncImage = "quay.io/konveyor/rsync-transfer:latest" + rsyncConfig = "backube-rsync-config" + rsyncSecretPrefix = "backube-rsync" + rsyncServiceAccount = "backube-rsync-sa" + rsyncRole = "backube-rsync-role" + rsyncRoleBinding = "backube-rsync-rolebinding" + rsyncdLogDir = "rsyncd-logs" + rsyncdLogDirPath = "/var/log/rsyncd/" +) + +// applyPodOptions take a PodSpec and PodOptions, applies +// each option to the given podSpec +// Following fields will be mutated: +// - spec.NodeSelector +// - spec.SecurityContext +// - spec.NodeName +// - spec.Containers[*].SecurityContext +// - spec.Containers[*].Resources +func applyPodOptions(podSpec *corev1.PodSpec, options transfer.PodOptions) { + podSpec.NodeSelector = options.NodeSelector + podSpec.NodeName = options.NodeName + podSpec.SecurityContext = &options.PodSecurityContext + for _, c := range podSpec.Containers { + c.SecurityContext = &options.ContainerSecurityContext + c.Resources = options.Resources + } +} diff --git a/transfer/rsync/server.go b/transfer/rsync/server.go new file mode 100644 index 0000000..ad45623 --- /dev/null +++ b/transfer/rsync/server.go @@ -0,0 +1,562 @@ +package rsync + +import ( + "bytes" + "context" + "fmt" + "strconv" + "text/template" + + "github.com/backube/pvc-transfer/endpoint" + "github.com/backube/pvc-transfer/internal/utils" + "github.com/backube/pvc-transfer/transfer" + "github.com/backube/pvc-transfer/transport" + "github.com/backube/pvc-transfer/transport/stunnel" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + ctrlutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +// AddToScheme should be used as soon as scheme is created to add +// kube objects for encoding/decoding required in this package +func AddToScheme(scheme *runtime.Scheme) error { + err := corev1.AddToScheme(scheme) + if err != nil { + return err + } + err = rbacv1.AddToScheme(scheme) + if err != nil { + return err + } + return nil +} + +// APIsToWatch give a list of APIs to watch if using this package +// to deploy the endpoint +func APIsToWatch() ([]client.Object, error) { + return []client.Object{ + &corev1.Secret{}, + &corev1.ConfigMap{}, + &corev1.Pod{}, + &corev1.ServiceAccount{}, + &rbacv1.RoleBinding{}, + &rbacv1.Role{}, + }, nil +} + +// DefaultSCCName is the default name of the security context constraint +const DefaultSCCName = "pvc-transfer-mover" + +const ( + rsyncServerConfTemplate = `syslog facility = local7 +read only = no +list = yes +log file = /dev/stdout +max verbosity = 4 +auth users = {{ $.Username }} +{{- if $.AllowLocalhostOnly }} +hosts allow = ::1, 127.0.0.1, localhost +{{- else }} +hosts allow = *.*.*.*, * +{{- end }} +uid = root +gid = root +{{ range $i, $pvc := .PVCList }} +[{{ $pvc.LabelSafeName }}] + comment = archive for {{ $pvc.Claim.Namespace }}/{{ $pvc.Claim.Name }} + path = /mnt/{{ $pvc.Claim.Namespace }}/{{ $pvc.LabelSafeName }} + use chroot = no + munge symlinks = no + list = yes + read only = false + auth users = {{ $.Username }} + secrets file = /etc/rsync-secret/rsyncd.secrets +{{ end }} +` +) + +type rsyncConfigData struct { + Username string + PVCList transfer.PVCList + AllowLocalhostOnly bool +} + +type reconcileFunc func(ctx context.Context, c client.Client, namespace string) error + +type server struct { + username string + password string + pvcList transfer.PVCList + transportServer transport.Transport + endpoint endpoint.Endpoint + listenPort int32 + + nameSuffix string + + labels map[string]string + ownerRefs []metav1.OwnerReference + options transfer.PodOptions +} + +func (s *server) Endpoint() endpoint.Endpoint { + return s.endpoint +} + +func (s *server) Transport() transport.Transport { + return s.transportServer +} + +func (s *server) IsHealthy(ctx context.Context, c client.Client) (bool, error) { + return transfer.IsPodHealthy(ctx, c, client.ObjectKey{Namespace: s.pvcList.GetNamespaces()[0], Name: fmt.Sprintf("rsync-server-%s", s.nameSuffix)}) +} + +func (s *server) Completed(ctx context.Context, c client.Client) (bool, error) { + return transfer.IsPodCompleted(ctx, c, client.ObjectKey{Namespace: s.pvcList.GetNamespaces()[0], Name: fmt.Sprintf("rsync-server-%s", s.nameSuffix)}, "rsync") +} + +// MarkForCleanup marks the provided "obj" to be deleted at the end of the +// synchronization iteration. +func (s *server) MarkForCleanup(ctx context.Context, c client.Client, key, value string) error { + // mark endpoint for deletion + err := s.Endpoint().MarkForCleanup(ctx, c, key, value) + if err != nil { + return err + } + + // mark transport for deletion + err = s.Transport().MarkForCleanup(ctx, c, key, value) + if err != nil { + return err + } + + namespace := s.pvcList.GetNamespaces()[0] + // update configmap + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", rsyncConfig, s.nameSuffix), + Namespace: namespace, + }, + } + err = utils.UpdateWithLabel(ctx, c, cm, key, value) + if err != nil { + return err + } + // update secret + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", rsyncSecretPrefix, s.nameSuffix), + Namespace: namespace, + }, + } + err = utils.UpdateWithLabel(ctx, c, secret, key, value) + if err != nil { + return err + } + + // update pod + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("rsync-server-%s", s.nameSuffix), + Namespace: namespace, + }, + } + err = utils.UpdateWithLabel(ctx, c, pod, key, value) + if err != nil { + return err + } + + // update service account + sa := &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", rsyncServiceAccount, s.nameSuffix), + Namespace: namespace, + }, + } + err = utils.UpdateWithLabel(ctx, c, sa, key, value) + if err != nil { + return err + } + + // update role + role := &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", rsyncRole, s.nameSuffix), + Namespace: namespace, + }, + } + err = utils.UpdateWithLabel(ctx, c, role, key, value) + if err != nil { + return err + } + + // update rolebinding + roleBinding := &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", rsyncRoleBinding, s.nameSuffix), + Namespace: namespace, + }, + } + return utils.UpdateWithLabel(ctx, c, roleBinding, key, value) +} + +func (s *server) PVCs() []*corev1.PersistentVolumeClaim { + pvcs := []*corev1.PersistentVolumeClaim{} + for _, pvc := range s.pvcList.PVCs() { + pvcs = append(pvcs, pvc.Claim()) + } + return pvcs +} + +func (s *server) ListenPort() int32 { + return s.listenPort +} + +func NewRsyncTransferServer(ctx context.Context, c client.Client, + pvcList transfer.PVCList, + t transport.Transport, + e endpoint.Endpoint, + labels map[string]string, + ownerRefs []metav1.OwnerReference, + password string, podOptions transfer.PodOptions) (transfer.Server, error) { + + // TODO: add proper validation for podOptions + if podOptions.ContainerSecurityContext.RunAsUser != nil && *podOptions.ContainerSecurityContext.RunAsUser != 0 { + return nil, fmt.Errorf("running as non-root user is not supported yet") + } + + r := &server{ + username: "root", + password: password, + pvcList: pvcList, + transportServer: t, + endpoint: e, + listenPort: t.ConnectPort(), + labels: labels, + ownerRefs: ownerRefs, + options: podOptions, + } + + var namespace string + namespaces := pvcList.GetNamespaces() + if len(namespaces) > 0 { + namespace = pvcList.GetNamespaces()[0] + } + + for _, ns := range namespaces { + if ns != namespace { + return nil, fmt.Errorf("PVC list provided has pvcs in different namespaces which is not supported") + } + } + + if namespace == "" { + return nil, fmt.Errorf("ether PVC list is empty or namespace is not specified") + } + + r.nameSuffix = transfer.GetNames(pvcList)[namespace][:10] + reconcilers := []reconcileFunc{ + r.reconcileConfigMap, + r.reconcileSecret, + r.reconcileServiceAccount, + r.reconcileRole, + r.reconcileRoleBinding, + r.reconcilePod, + } + + for _, reconcile := range reconcilers { + err := reconcile(ctx, c, namespace) + if err != nil { + return nil, err + } + } + + return r, nil +} + +func (s *server) reconcileConfigMap(ctx context.Context, c client.Client, namespace string) error { + var rsyncConf bytes.Buffer + rsyncConfTemplate, err := template.New("config").Parse(rsyncServerConfTemplate) + if err != nil { + return err + } + + allowLocalhostOnly := s.Transport().Type() == stunnel.TransportTypeStunnel + configdata := rsyncConfigData{ + Username: s.username, + PVCList: s.pvcList.InNamespace(namespace), + AllowLocalhostOnly: allowLocalhostOnly, + } + + err = rsyncConfTemplate.Execute(&rsyncConf, configdata) + if err != nil { + return err + } + + rsyncConfigMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: fmt.Sprintf("%s-%s", rsyncConfig, s.nameSuffix), + }, + } + + _, err = ctrlutil.CreateOrUpdate(ctx, c, rsyncConfigMap, func() error { + rsyncConfigMap.Labels = s.labels + rsyncConfigMap.OwnerReferences = s.ownerRefs + rsyncConfigMap.Data = map[string]string{ + "rsyncd.conf": rsyncConf.String(), + } + return nil + }) + return err +} + +func (s *server) reconcileSecret(ctx context.Context, c client.Client, namespace string) error { + if s.password == "" { + return fmt.Errorf("password is empty") + } + + rsyncSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: fmt.Sprintf("%s-%s", rsyncSecretPrefix, s.nameSuffix), + }, + } + + _, err := ctrlutil.CreateOrUpdate(ctx, c, rsyncSecret, func() error { + rsyncSecret.Labels = s.labels + rsyncSecret.OwnerReferences = s.ownerRefs + rsyncSecret.Data = map[string][]byte{ + "credentials": []byte(s.username + ":" + s.password), + } + return nil + }) + + return err +} + +func (s *server) reconcilePod(ctx context.Context, c client.Client, namespace string) error { + volumeMounts := []corev1.VolumeMount{} + configVolumeMounts := s.getConfigVolumeMounts() + pvcVolumeMounts := s.getPVCVolumeMounts(namespace) + + volumeMounts = append(volumeMounts, configVolumeMounts...) + volumeMounts = append(volumeMounts, pvcVolumeMounts...) + containers := s.getContainers(volumeMounts) + + containers = append(containers, s.Transport().Containers()...) + + mode := int32(0600) + + configVolumes := s.getConfigVolumes(mode) + pvcVolumes := s.getPVCVolumes(namespace) + + volumes := append(pvcVolumes, configVolumes...) + volumes = append(volumes, s.Transport().Volumes()...) + + podSpec := corev1.PodSpec{ + Containers: containers, + Volumes: volumes, + RestartPolicy: corev1.RestartPolicyNever, + ServiceAccountName: fmt.Sprintf("%s-%s", rsyncServiceAccount, s.nameSuffix), + } + + applyPodOptions(&podSpec, s.options) + + server := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("rsync-server-%s", s.nameSuffix), + Namespace: namespace, + }, + Spec: podSpec, + } + + _, err := ctrlutil.CreateOrUpdate(ctx, c, server, func() error { + server.Labels = s.labels + server.OwnerReferences = s.ownerRefs + server.Spec = podSpec + return nil + }) + return err +} + +func (s *server) getConfigVolumes(mode int32) []corev1.Volume { + return []corev1.Volume{ + { + Name: fmt.Sprintf("%s-%s", rsyncConfig, s.nameSuffix), + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: fmt.Sprintf("%s-%s", rsyncConfig, s.nameSuffix), + }, + }, + }, + }, + { + Name: fmt.Sprintf("%s-%s", rsyncSecretPrefix, s.nameSuffix), + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: fmt.Sprintf("%s-%s", rsyncSecretPrefix, s.nameSuffix), + DefaultMode: &mode, + Items: []corev1.KeyToPath{ + { + Key: "credentials", + Path: "rsyncd.secrets", + }, + }, + }, + }, + }, + { + Name: rsyncdLogDir, + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + } +} + +func (s *server) getPVCVolumeMounts(namespace string) []corev1.VolumeMount { + pvcVolumeMounts := []corev1.VolumeMount{} + for _, pvc := range s.pvcList.InNamespace(namespace).PVCs() { + pvcVolumeMounts = append( + pvcVolumeMounts, + corev1.VolumeMount{ + Name: pvc.LabelSafeName(), + MountPath: fmt.Sprintf("/mnt/%s/%s", pvc.Claim().Namespace, pvc.LabelSafeName()), + }) + } + return pvcVolumeMounts +} + +func (s *server) getContainers(volumeMounts []corev1.VolumeMount) []corev1.Container { + rsyncCommandTemplate := `/usr/bin/rsync --daemon --no-detach --port=` + strconv.Itoa(int(s.ListenPort())) + ` -vvv | tee ` + rsyncdLogDirPath + `rsync.log & +while true; do + grep "_exit_cleanup" ` + rsyncdLogDirPath + `rsync.log >> /dev/null + if [[ $? -eq 0 ]] + then + exit 0; + fi + sleep 1; +done` + + return []corev1.Container{ + { + Name: RsyncContainer, + Image: rsyncImage, + Command: []string{ + "/bin/bash", + "-c", + rsyncCommandTemplate, + }, + Ports: []corev1.ContainerPort{ + { + Name: "rsyncd", + Protocol: corev1.ProtocolTCP, + ContainerPort: s.ListenPort(), + }, + }, + VolumeMounts: volumeMounts, + }, + } +} + +func (s *server) getPVCVolumes(namespace string) []corev1.Volume { + pvcVolumes := []corev1.Volume{} + for _, pvc := range s.pvcList.InNamespace(namespace).PVCs() { + pvcVolumes = append( + pvcVolumes, + corev1.Volume{ + Name: pvc.LabelSafeName(), + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Claim().Name, + }, + }, + }, + ) + } + return pvcVolumes +} + +func (s *server) getConfigVolumeMounts() []corev1.VolumeMount { + return []corev1.VolumeMount{ + { + Name: fmt.Sprintf("%s-%s", rsyncConfig, s.nameSuffix), + MountPath: "/etc/rsyncd.conf", + SubPath: "rsyncd.conf", + }, + { + Name: fmt.Sprintf("%s-%s", rsyncSecretPrefix, s.nameSuffix), + MountPath: "/etc/rsync-secret", + }, + { + Name: rsyncdLogDir, + MountPath: rsyncdLogDirPath, + }, + } +} + +func (s *server) reconcileServiceAccount(ctx context.Context, c client.Client, namespace string) error { + sa := &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", rsyncServiceAccount, s.nameSuffix), + Namespace: namespace, + }, + } + _, err := ctrlutil.CreateOrUpdate(ctx, c, sa, func() error { + sa.Labels = s.labels + sa.OwnerReferences = s.ownerRefs + return nil + }) + return err +} + +func (s *server) reconcileRole(ctx context.Context, c client.Client, namespace string) error { + role := &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", rsyncRole, s.nameSuffix), + Namespace: namespace, + }, + } + _, err := ctrlutil.CreateOrUpdate(ctx, c, role, func() error { + role.OwnerReferences = s.ownerRefs + role.Labels = s.labels + role.Rules = []rbacv1.PolicyRule{ + { + APIGroups: []string{"security.openshift.io"}, + Resources: []string{"securitycontextconstraints"}, + // Must match the name of the SCC that is deployed w/ the operator + // config/openshift/mover_scc.yaml + ResourceNames: []string{DefaultSCCName}, + Verbs: []string{"use"}, + }, + } + return nil + }) + return err +} + +func (s *server) reconcileRoleBinding(ctx context.Context, c client.Client, namespace string) error { + roleBinding := &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", rsyncRoleBinding, s.nameSuffix), + Namespace: namespace, + }, + } + _, err := ctrlutil.CreateOrUpdate(ctx, c, roleBinding, func() error { + roleBinding.OwnerReferences = s.ownerRefs + roleBinding.Labels = s.labels + roleBinding.RoleRef = rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "Role", + Name: fmt.Sprintf("%s-%s", rsyncRole, s.nameSuffix), + } + roleBinding.Subjects = []rbacv1.Subject{ + {Kind: "ServiceAccount", Name: fmt.Sprintf("%s-%s", rsyncServiceAccount, s.nameSuffix)}, + } + return nil + }) + return err +} diff --git a/transfer/rsync/server_test.go b/transfer/rsync/server_test.go new file mode 100644 index 0000000..42ee1da --- /dev/null +++ b/transfer/rsync/server_test.go @@ -0,0 +1,705 @@ +package rsync + +import ( + "context" + "fmt" + rbacv1 "k8s.io/api/rbac/v1" + "reflect" + "strings" + "testing" + + "github.com/backube/pvc-transfer/transfer" + "github.com/backube/pvc-transfer/transport" + "github.com/backube/pvc-transfer/transport/stunnel" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func testOwnerReferences() []metav1.OwnerReference { + return []metav1.OwnerReference{{ + APIVersion: "api.foo", + Kind: "Test", + Name: "bar", + UID: "123", + Controller: pointer.Bool(true), + BlockOwnerDeletion: pointer.Bool(true), + }} +} + +type fakeTransportServer struct { + transportType transport.Type +} + +func (f *fakeTransportServer) NamespacedName() types.NamespacedName { + panic("implement me") +} + +func (f *fakeTransportServer) ListenPort() int32 { + panic("implement me") +} + +func (f *fakeTransportServer) ConnectPort() int32 { + panic("implement me") +} + +func (f *fakeTransportServer) Containers() []corev1.Container { + return []corev1.Container{{Name: "fakeTransportServerContainer"}} +} + +func (f *fakeTransportServer) Volumes() []corev1.Volume { + return []corev1.Volume{{ + Name: "fakeVolume", + }} +} + +func (f *fakeTransportServer) Type() transport.Type { + return f.transportType +} + +func (f *fakeTransportServer) Credentials() types.NamespacedName { + panic("implement me") +} + +func (f *fakeTransportServer) Hostname() string { + panic("implement me") +} + +func (f *fakeTransportServer) MarkForCleanup(ctx context.Context, c client.Client, key, value string) error { + panic("implement me") +} + +func fakeClientWithObjects(objs ...ctrlclient.Object) ctrlclient.WithWatch { + scheme := runtime.NewScheme() + _ = AddToScheme(scheme) + return fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build() +} + +func Test_server_reconcileConfigMap(t *testing.T) { + tests := []struct { + name string + username string + pvcList transfer.PVCList + transportServer transport.Transport + labels map[string]string + ownerRefs []metav1.OwnerReference + namespace string + wantErr bool + nameSuffix string + objects []client.Object + }{ + { + name: "test with no configmap", + username: "root", + pvcList: transfer.NewSingletonPVC(&corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pvc", + Namespace: "foo", + }, + }), + transportServer: &fakeTransportServer{stunnel.TransportTypeStunnel}, + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{}, + }, + { + name: "test with invalid configmap", + username: "root", + pvcList: transfer.NewSingletonPVC(&corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pvc", + Namespace: "foo", + }, + }), + transportServer: &fakeTransportServer{stunnel.TransportTypeStunnel}, + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{ + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: rsyncConfig + "-foo", + Namespace: "foo", + Labels: map[string]string{"foo": "bar"}, + OwnerReferences: []metav1.OwnerReference{}, + }, + }, + }, + }, + { + name: "test with valid configmap", + username: "root", + pvcList: transfer.NewSingletonPVC(&corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pvc", + Namespace: "foo", + }, + }), + transportServer: &fakeTransportServer{stunnel.TransportTypeStunnel}, + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{ + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: rsyncConfig + "-foo", + Namespace: "foo", + Labels: map[string]string{"test": "me"}, + OwnerReferences: testOwnerReferences(), + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClient := fakeClientWithObjects(tt.objects...) + s := &server{ + nameSuffix: tt.nameSuffix, + username: tt.username, + pvcList: tt.pvcList, + labels: tt.labels, + ownerRefs: tt.ownerRefs, + transportServer: tt.transportServer, + } + ctx := context.WithValue(context.Background(), "test", tt.name) + if err := s.reconcileConfigMap(ctx, fakeClient, tt.namespace); (err != nil) != tt.wantErr { + t.Errorf("reconcileConfigMap() error = %v, wantErr %v", err, tt.wantErr) + } + cm := &corev1.ConfigMap{} + err := fakeClient.Get(context.Background(), types.NamespacedName{ + Namespace: tt.namespace, + Name: rsyncConfig + "-" + tt.nameSuffix, + }, cm) + if err != nil { + panic(fmt.Errorf("%#v should not be getting error from fake client", err)) + } + + configData, ok := cm.Data["rsyncd.conf"] + if !ok { + t.Error("unable to find rsyncd config data in configmap") + } + if !strings.Contains(configData, "syslog facility = local7") { + t.Error("configmap data does not contain the right data") + } + + if !reflect.DeepEqual(cm.Labels, tt.labels) { + t.Error("configmap does not have the right labels") + } + + if !reflect.DeepEqual(cm.OwnerReferences, tt.ownerRefs) { + t.Error("configmap does not have the right owner references") + } + }) + } +} + +func Test_server_reconcileSecret(t *testing.T) { + tests := []struct { + name string + username string + password string + labels map[string]string + ownerRefs []metav1.OwnerReference + namespace string + wantErr bool + nameSuffix string + objects []client.Object + }{ + { + name: "test if password is empty", + username: "root", + password: "", + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: true, + nameSuffix: "foo", + objects: []client.Object{}, + }, + { + name: "secret with invalid data", + username: "root", + password: "root", + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{ + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "backube-rsync-foo", + Namespace: "foo", + Labels: map[string]string{"foo": "bar"}, + OwnerReferences: []metav1.OwnerReference{}, + }, + }, + }, + }, + { + name: "secret with valid data", + username: "root", + password: "root", + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{ + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "backube-rsync-foo", + Namespace: "foo", + Labels: map[string]string{"test": "me"}, + OwnerReferences: testOwnerReferences(), + }, + Data: map[string][]byte{ + "credentials": []byte("root:root"), + }, + }, + }, + }, + } + for _, tt := range tests { + fakeClient := fakeClientWithObjects(tt.objects...) + ctx := context.WithValue(context.Background(), "test", tt.name) + + t.Run(tt.name, func(t *testing.T) { + s := &server{ + username: tt.username, + password: tt.password, + labels: tt.labels, + ownerRefs: tt.ownerRefs, + nameSuffix: tt.nameSuffix, + } + err := s.reconcileSecret(ctx, fakeClient, tt.namespace) + if (err != nil) != tt.wantErr { + t.Errorf("reconcileSecret() error = %v, wantErr %v", err, tt.wantErr) + } + if tt.wantErr { + return + } + secret := &corev1.Secret{} + err = fakeClient.Get(context.Background(), types.NamespacedName{ + Namespace: tt.namespace, + Name: rsyncSecretPrefix + "-" + tt.nameSuffix, + }, secret) + if err != nil { + panic(fmt.Errorf("%#v should not be getting error from fake client", err)) + } + + secretData, ok := secret.Data["credentials"] + if !ok { + t.Error("unable to find credentials in secret") + } + if !strings.Contains(string(secretData), tt.username+":"+tt.password) { + t.Error("secrets does not contain the right data") + } + + if !reflect.DeepEqual(secret.Labels, tt.labels) { + t.Error("secret does not have the right labels") + } + + if !reflect.DeepEqual(secret.OwnerReferences, tt.ownerRefs) { + t.Error("secret does not have the right owner references") + } + }) + } +} + +func Test_server_reconcileServiceAccount(t *testing.T) { + tests := []struct { + name string + labels map[string]string + ownerRefs []metav1.OwnerReference + namespace string + wantErr bool + nameSuffix string + objects []client.Object + }{ + { + name: "test with missing service account", + namespace: "foo", + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{}, + }, + { + name: "test with invalid service account", + namespace: "foo", + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{ + &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", rsyncServiceAccount, "foo"), + Namespace: "foo", + Labels: map[string]string{"foo": "bar"}, + OwnerReferences: []metav1.OwnerReference{}, + }, + }, + }, + }, + { + name: "test with valid service account", + namespace: "foo", + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{ + &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", rsyncServiceAccount, "foo"), + Namespace: "foo", + Labels: map[string]string{"test": "me"}, + OwnerReferences: testOwnerReferences(), + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClient := fakeClientWithObjects(tt.objects...) + ctx := context.WithValue(context.Background(), "test", tt.name) + + s := &server{ + nameSuffix: tt.nameSuffix, + labels: tt.labels, + ownerRefs: tt.ownerRefs, + } + if err := s.reconcileServiceAccount(ctx, fakeClient, tt.namespace); (err != nil) != tt.wantErr { + t.Errorf("reconcileServiceAccount() error = %v, wantErr %v", err, tt.wantErr) + } + + sa := &corev1.ServiceAccount{} + err := fakeClient.Get(context.Background(), types.NamespacedName{ + Namespace: tt.namespace, + Name: rsyncServiceAccount + "-" + tt.nameSuffix, + }, sa) + if err != nil { + panic(fmt.Errorf("%#v should not be getting error from fake client", err)) + } + + if !reflect.DeepEqual(sa.Labels, tt.labels) { + t.Error("sa does not have the right labels") + } + if !reflect.DeepEqual(sa.OwnerReferences, tt.ownerRefs) { + t.Error("sa does not have the right owner references") + } + }) + } +} + +func Test_server_reconcileRole(t *testing.T) { + tests := []struct { + name string + labels map[string]string + ownerRefs []metav1.OwnerReference + namespace string + wantErr bool + nameSuffix string + objects []client.Object + }{ + { + name: "test with missing role", + namespace: "foo", + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{}, + }, + { + name: "test with invalid role", + namespace: "foo", + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{ + &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", rsyncRole, "foo"), + Namespace: "foo", + Labels: map[string]string{"foo": "bar"}, + OwnerReferences: []metav1.OwnerReference{}, + }, + }, + }, + }, + { + name: "test with valid role", + namespace: "foo", + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{ + &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", rsyncRole, "foo"), + Namespace: "foo", + Labels: map[string]string{"test": "me"}, + OwnerReferences: testOwnerReferences(), + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &server{ + nameSuffix: tt.nameSuffix, + labels: tt.labels, + ownerRefs: tt.ownerRefs, + } + + fakeClient := fakeClientWithObjects(tt.objects...) + ctx := context.WithValue(context.Background(), "test", tt.name) + + if err := s.reconcileRole(ctx, fakeClient, tt.namespace); (err != nil) != tt.wantErr { + t.Errorf("reconcileRole() error = %v, wantErr %v", err, tt.wantErr) + } + + role := &rbacv1.Role{} + err := fakeClient.Get(context.Background(), types.NamespacedName{ + Namespace: tt.namespace, + Name: rsyncRole + "-" + tt.nameSuffix, + }, role) + if err != nil { + panic(fmt.Errorf("%#v should not be getting error from fake client", err)) + } + + if !reflect.DeepEqual(role.Labels, tt.labels) { + t.Error("role does not have the right labels") + } + if !reflect.DeepEqual(role.OwnerReferences, tt.ownerRefs) { + t.Error("role does not have the right owner references") + } + + }) + } +} + +func Test_server_reconcileRoleBinding(t *testing.T) { + tests := []struct { + name string + labels map[string]string + ownerRefs []metav1.OwnerReference + namespace string + wantErr bool + nameSuffix string + objects []client.Object + }{ + { + name: "test with missing rolebinding", + namespace: "foo", + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{}, + }, + { + name: "test with invalid rolebinding", + namespace: "foo", + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{ + &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", rsyncRoleBinding, "foo"), + Namespace: "foo", + Labels: map[string]string{"foo": "bar"}, + OwnerReferences: []metav1.OwnerReference{}, + }, + }, + }, + }, + { + name: "test with valid rolebinding", + namespace: "foo", + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{ + &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", rsyncRoleBinding, "foo"), + Namespace: "foo", + Labels: map[string]string{"test": "me"}, + OwnerReferences: testOwnerReferences(), + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &server{ + nameSuffix: tt.nameSuffix, + labels: tt.labels, + ownerRefs: tt.ownerRefs, + } + + fakeClient := fakeClientWithObjects(tt.objects...) + ctx := context.WithValue(context.Background(), "test", tt.name) + + if err := s.reconcileRoleBinding(ctx, fakeClient, tt.namespace); (err != nil) != tt.wantErr { + t.Errorf("reconcileRoleBinding() error = %v, wantErr %v", err, tt.wantErr) + } + rolebinding := &rbacv1.RoleBinding{} + err := fakeClient.Get(context.Background(), types.NamespacedName{ + Namespace: tt.namespace, + Name: rsyncRoleBinding + "-" + tt.nameSuffix, + }, rolebinding) + if err != nil { + panic(fmt.Errorf("%#v should not be getting error from fake client", err)) + } + + if !reflect.DeepEqual(rolebinding.Labels, tt.labels) { + t.Error("rolebinding does not have the right labels") + } + if !reflect.DeepEqual(rolebinding.OwnerReferences, tt.ownerRefs) { + t.Error("rolebinding does not have the right owner references") + } + + }) + } +} + +func Test_server_reconcilePod(t *testing.T) { + tests := []struct { + name string + username string + pvcList transfer.PVCList + transportServer transport.Transport + labels map[string]string + ownerRefs []metav1.OwnerReference + namespace string + wantErr bool + nameSuffix string + listenPort int32 + objects []client.Object + }{ + { + name: "test with no pod", + username: "root", + pvcList: transfer.NewSingletonPVC(&corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pvc", + Namespace: "foo", + }, + }), + listenPort: 8080, + transportServer: &fakeTransportServer{stunnel.TransportTypeStunnel}, + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{}, + }, + { + name: "test with invalid pod", + username: "root", + pvcList: transfer.NewSingletonPVC(&corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pvc", + Namespace: "foo", + }, + }), + listenPort: 8080, + transportServer: &fakeTransportServer{stunnel.TransportTypeStunnel}, + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rsync-server-foo", + Namespace: "foo", + Labels: map[string]string{"foo": "bar"}, + OwnerReferences: []metav1.OwnerReference{}, + }, + }, + }, + }, + { + name: "test with valid pod", + username: "root", + pvcList: transfer.NewSingletonPVC(&corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pvc", + Namespace: "foo", + }, + }), + listenPort: 8080, + transportServer: &fakeTransportServer{stunnel.TransportTypeStunnel}, + labels: map[string]string{"test": "me"}, + ownerRefs: testOwnerReferences(), + wantErr: false, + nameSuffix: "foo", + objects: []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rsync-server-foo", + Namespace: "foo", + Labels: map[string]string{"test": "me"}, + OwnerReferences: testOwnerReferences(), + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClient := fakeClientWithObjects(tt.objects...) + ctx := context.WithValue(context.Background(), "test", tt.name) + s := &server{ + username: tt.username, + pvcList: tt.pvcList, + transportServer: tt.transportServer, + listenPort: tt.listenPort, + nameSuffix: tt.nameSuffix, + labels: tt.labels, + ownerRefs: tt.ownerRefs, + } + if err := s.reconcilePod(ctx, fakeClient, tt.namespace); (err != nil) != tt.wantErr { + t.Errorf("reconcilePod() error = %v, wantErr %v", err, tt.wantErr) + } + + pod := &corev1.Pod{} + err := fakeClient.Get(context.Background(), types.NamespacedName{ + Namespace: tt.namespace, + Name: "rsync-server-" + tt.nameSuffix, + }, pod) + if err != nil { + panic(fmt.Errorf("%#v should not be getting error from fake client", err)) + } + + if !reflect.DeepEqual(pod.Labels, tt.labels) { + t.Error("pod does not have the right labels") + } + if !reflect.DeepEqual(pod.OwnerReferences, tt.ownerRefs) { + t.Error("pod does not have the right owner references") + } + }) + } +} diff --git a/transfer/transfer.go b/transfer/transfer.go new file mode 100644 index 0000000..7dc3f36 --- /dev/null +++ b/transfer/transfer.go @@ -0,0 +1,177 @@ +package transfer + +import ( + "context" + "fmt" + "math/rand" + "time" + + "github.com/backube/pvc-transfer/endpoint" + "github.com/backube/pvc-transfer/transport" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + errorsutil "k8s.io/apimachinery/pkg/util/errors" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// Transfer knows how to transfer PV data from a source to a destination +// Server creates an rsync server on the destination +type Server interface { + // Endpoint returns the endpoint used by the transfer + Endpoint() endpoint.Endpoint + // Transport returns the transport used by the transfer + Transport() transport.Transport + // ListenPort returns the port on which transfer server pod is listening on + ListenPort() int32 + // IsHealthy returns whether or not all Kube resources used by endpoint are healthy + IsHealthy(ctx context.Context, c client.Client) (bool, error) + // Completed returns whether or not the current attempt of transfer is completed + Completed(ctx context.Context, c client.Client) (bool, error) + // PVCs returns the list of PVCs the transfer will migrate + PVCs() []*corev1.PersistentVolumeClaim + // MarkForCleanup add the required labels to all the resources for + // cleaning up + MarkForCleanup(ctx context.Context, c client.Client, key, value string) error +} + +type Client interface { + // Transport returns the transport used by the transfer + Transport() transport.Transport + // PVCs returns the list of PVCs the transfer will migrate + PVCs() []*corev1.PersistentVolumeClaim + // IsCompleted returns whether the client is done + Status(ctx context.Context, c client.Client) (*Status, error) + // MarkForCleanup adds a key-value label to all the resources to be cleaned up + MarkForCleanup(ctx context.Context, c client.Client, key, value string) error +} + +// PodOptions allow callers to pass custom configuration for the transfer pods +type PodOptions struct { + // PodSecurityContext determines what GID the rsync process gets + // In case of shared storage SupplementalGroups is configured to get the gid + // In case of block storage FSGroup is configured to get the gid + PodSecurityContext corev1.PodSecurityContext + // ContainerSecurityContext determines what selinux labels, UID and drop capabilities + // are required for the containers in rsync transfer pod via SELinuxOptions, RunAsUser and + // Capabilities fields respectively + ContainerSecurityContext corev1.SecurityContext + // NodeName allows pods to be scheduled on a specific node. This is especially required in + // client pods where the PVC's are bound to a specific regions and the node where the pod runs on + // has to be in the same region as the PVC to be scheduled and bound. + NodeName string + // NodeSelector is a wider net for scheduling the pods on node than NodeName. + NodeSelector map[string]string + // Resources allows for configuring the resources consumed by the transfer pods. In general + // it is good to provision destination transfer pod with same or larger resources than the source + // so that the network is not congested. + Resources corev1.ResourceRequirements +} + +type Status struct { + Running *Running + Completed *Completed +} + +type Running struct { + StartedAt *metav1.Time +} + +type Completed struct { + Successful bool + Failure bool + FinishedAt *metav1.Time +} + +// IsPodHealthy is a utility function that can be used by various +// implementations to check if the server pod deployed is healthy +func IsPodHealthy(ctx context.Context, c client.Client, pod client.ObjectKey) (bool, error) { + p := &corev1.Pod{} + + err := c.Get(context.Background(), pod, p) + if err != nil { + return false, err + } + + return areContainersReady(p) +} + +// IsPodCompleted is a utility function that can be used by various +// implementations to check if the server pod deployed is completed. +// if containerName is empty string then it will check for completion of +// all the containers +func IsPodCompleted(ctx context.Context, c client.Client, podKey client.ObjectKey, containerName string) (bool, error) { + pod := &corev1.Pod{} + err := c.Get(context.Background(), podKey, pod) + if err != nil { + return false, err + } + + if len(pod.Status.ContainerStatuses) != 2 { + return false, fmt.Errorf("expected two contaier statuses found %d, for pod %s", + len(pod.Status.ContainerStatuses), client.ObjectKey{Namespace: pod.Namespace, Name: pod.Name}) + } + + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerName != "" && containerStatus.Name == containerName { + return containerStatus.State.Terminated != nil, nil + } else { + if containerStatus.State.Terminated == nil { + return false, nil + } + } + } + return true, nil +} + +func areContainersReady(pod *corev1.Pod) (bool, error) { + if len(pod.Status.ContainerStatuses) != 2 { + return false, fmt.Errorf("expected two contaier statuses found %d, for pod %s", + len(pod.Status.ContainerStatuses), client.ObjectKey{Namespace: pod.Namespace, Name: pod.Name}) + } + + for _, containerStatus := range pod.Status.ContainerStatuses { + if !containerStatus.Ready { + return false, fmt.Errorf("container %s in pod %s is not ready", + containerStatus.Name, client.ObjectKey{Namespace: pod.Namespace, Name: pod.Name}) + } + } + return true, nil +} + +// AreFilteredPodsHealthy is a utility function that can be used by various +// implementations to check if the server pods deployed with some label selectors +// are healthy. If atleast 1 replica will be healthy the function will return true +func AreFilteredPodsHealthy(ctx context.Context, c client.Client, namespace string, labels fields.Set) (bool, error) { + pList := &corev1.PodList{} + + err := c.List(context.Background(), pList, client.InNamespace(namespace), client.MatchingFields(labels)) + if err != nil { + return false, err + } + + errs := []error{} + + for i := range pList.Items { + podReady, err := areContainersReady(&pList.Items[i]) + if err != nil { + errs = append(errs, err) + } + if podReady { + return true, nil + } + } + + return false, errorsutil.NewAggregate(errs) +} + +// GeneratePassword can be used to generate random character string for 24 byte +func GeneratePassword() string { + var letters = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + rand.Seed(time.Now().UnixNano()) + password := make([]byte, 24) + for i := range password { + password[i] = letters[rand.Intn(len(letters))] + } + return string(password) +}