Skip to content

Commit

Permalink
add rsync client implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Alay Patel <[email protected]>
  • Loading branch information
alaypatel07 committed Feb 17, 2022
1 parent 96c56da commit fdf0c4b
Show file tree
Hide file tree
Showing 2 changed files with 566 additions and 0 deletions.
353 changes: 353 additions & 0 deletions transfer/rsync/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
package rsync

import (
"context"
"fmt"
"strings"

"github.com/backube/pvc-transfer/endpoint"
"github.com/backube/pvc-transfer/internal/utils"
"github.com/backube/pvc-transfer/transfer"
"github.com/backube/pvc-transfer/transport"
"github.com/backube/pvc-transfer/transport/stunnel"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
errorsutil "k8s.io/apimachinery/pkg/util/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

type Client struct {
pvcList transfer.PVCList

username string
password string
transportClient transport.Transport
endpoint endpoint.Endpoint

labels map[string]string
ownerRefs []metav1.OwnerReference
options transfer.PodOptions
nameSuffix string
}

func (tc *Client) Transport() transport.Transport {
return tc.transportClient
}

func (tc *Client) PVCs() []*corev1.PersistentVolumeClaim {
pvcs := []*corev1.PersistentVolumeClaim{}
for _, pvc := range tc.pvcList.PVCs() {
pvcs = append(pvcs, pvc.Claim())
}
return pvcs
}

func (tc *Client) Status(ctx context.Context, c client.Client) (*transfer.Status, error) {
podList := &corev1.PodList{}
err := c.List(ctx, podList, client.MatchingLabels(tc.labels))
if err != nil {
return nil, err
}

for _, pod := range podList.Items {
if pod.Status.ContainerStatuses != nil || len(pod.Status.ContainerStatuses) > 0 {
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.Name == "rsync" && containerStatus.State.Terminated != nil {
if containerStatus.State.Terminated.ExitCode == 0 {
return &transfer.Status{
Completed: &transfer.Completed{
Successful: true,
Failure: false,
FinishedAt: &containerStatus.State.Terminated.FinishedAt,
},
}, nil
} else {
return &transfer.Status{
Running: nil,
Completed: &transfer.Completed{
Successful: false,
Failure: true,
FinishedAt: &containerStatus.State.Terminated.FinishedAt,
},
}, nil
}
}
}
}
}
return nil, fmt.Errorf("unable to find the appropriate container to inspect status for rsync transfer")
}

func (tc *Client) MarkForCleanup(ctx context.Context, c client.Client, key, value string) error {
err := tc.Transport().MarkForCleanup(ctx, c, key, value)
if err != nil {
return err
}

podList := &corev1.PodList{}
err = c.List(ctx, podList, client.MatchingLabels(tc.labels))
if err != nil {
return err
}

for _, p := range podList.Items {
if err := utils.UpdateWithLabel(ctx, c, &p, key, value); err != nil {
return err
}
}
return nil
}

func NewRsyncTransferClient(ctx context.Context, c client.Client,
pvcList transfer.PVCList,
t transport.Transport,
e endpoint.Endpoint,
labels map[string]string,
ownerRefs []metav1.OwnerReference,
password string, podOptions transfer.PodOptions) (transfer.Client, error) {
tc := &Client{
username: "root",
password: password,
pvcList: pvcList,
transportClient: t,
endpoint: e,
labels: labels,
ownerRefs: ownerRefs,
options: podOptions,
}

var namespace string
namespaces := pvcList.GetNamespaces()
if len(namespaces) > 0 {
namespace = pvcList.GetNamespaces()[0]
}

for _, ns := range namespaces {
if ns != namespace {
return nil, fmt.Errorf("PVC list provided has pvcs in different namespaces which is not supported")
}
}

if namespace == "" {
return nil, fmt.Errorf("ether PVC list is empty or namespace is not specified")
}

tc.nameSuffix = transfer.GetNames(pvcList)[namespace][:10]
err := tc.createRsyncClient(ctx, c, pvcList.GetNamespaces()[0])
if err != nil {
return nil, err
}

return tc, nil
}

func (tc *Client) createRsyncClient(ctx context.Context, c client.Client, ns string) error {
var errs []error

podList := &corev1.PodList{}
err := c.List(context.Background(), podList, client.MatchingLabels(tc.labels))
if err != nil {
return err
}

if len(podList.Items) > 0 {
return nil
}

rsyncOptions, err := GetRsyncCommandDefaultFlags()
if err != nil {
return err
}
for _, pvc := range tc.pvcList.InNamespace(ns).PVCs() {
// create Rsync command for PVC
rsyncContainerCommand := tc.getRsyncCommand(rsyncOptions, pvc)
// create rsync container
containers := []corev1.Container{
{
Name: RsyncContainer,
Image: rsyncImage,
Command: rsyncContainerCommand,
Env: []corev1.EnvVar{
{
Name: "RSYNC_PASSWORD",
Value: tc.password,
},
},

VolumeMounts: []corev1.VolumeMount{
{
Name: "mnt",
MountPath: fmt.Sprintf("/mnt/%s/%s", pvc.Claim().Namespace, pvc.LabelSafeName()),
},
{
Name: "rsync-communication",
MountPath: "/usr/share/rsync",
},
},
},
}
// attach transport containers
customizeTransportClientContainers(tc.Transport())
containers = append(containers, tc.Transport().Containers()...)

volumes := []corev1.Volume{
{
Name: "mnt",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Claim().Name,
},
},
},
{
Name: "rsync-communication",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{Medium: corev1.StorageMediumDefault},
},
},
}
volumes = append(volumes, tc.Transport().Volumes()...)
podSpec := corev1.PodSpec{
Containers: containers,
Volumes: volumes,
RestartPolicy: corev1.RestartPolicyNever,
}

applyPodOptions(&podSpec, tc.options)

pod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "rsync-",
Namespace: pvc.Claim().Namespace,
Labels: tc.labels,
OwnerReferences: tc.ownerRefs,
},
Spec: podSpec,
}

err := c.Create(ctx, &pod, &client.CreateOptions{})
errs = append(errs, err)
}

return errorsutil.NewAggregate(errs)
}

func (tc *Client) getRsyncCommand(rsyncOptions []string, pvc transfer.PVC) []string {
// TODO: add a stub for null transport
rsyncCommand := []string{"/usr/bin/rsync"}
rsyncCommand = append(rsyncCommand, rsyncOptions...)
rsyncCommand = append(rsyncCommand, fmt.Sprintf("/mnt/%s/%s", pvc.Claim().Namespace, pvc.LabelSafeName()))
rsyncCommand = append(rsyncCommand,
fmt.Sprintf("rsync://%s@%s/%s --port %d",
tc.username,
tc.Transport().Hostname(),
pvc.LabelSafeName(), tc.Transport().ListenPort()))
rsyncCommandBashScript := fmt.Sprintf(
"trap \"touch /usr/share/rsync/rsync-client-container-done\" EXIT SIGINT SIGTERM; timeout=120; SECONDS=0; while [ $SECONDS -lt $timeout ]; do nc -z localhost %d; rc=$?; if [ $rc -eq 0 ]; then %s; rc=$?; break; fi; done; exit $rc;",
tc.Transport().ListenPort(),
strings.Join(rsyncCommand, " "))
rsyncContainerCommand := []string{
"/bin/bash",
"-c",
rsyncCommandBashScript,
}
return rsyncContainerCommand
}

// customizeTransportClientContainers customizes transport's client containers for specific rsync communication
func customizeTransportClientContainers(transportClient transport.Transport) {
switch transportClient.Type() {
case stunnel.TransportTypeStunnel:
var stunnelContainer *corev1.Container
for i := range transportClient.Containers() {
c := &transportClient.Containers()[i]
if c.Name == stunnel.Container {
stunnelContainer = c
}
}
stunnelContainer.Command = []string{
"/bin/bash",
"-c",
`/bin/stunnel /etc/stunnel/stunnel.conf
while true
do test -f /usr/share/rsync/rsync-client-container-done
if [ $? -eq 0 ]
then
break
fi
done
exit 0`,
}
stunnelContainer.VolumeMounts = append(
stunnelContainer.VolumeMounts,
corev1.VolumeMount{
Name: "rsync-communication",
MountPath: "/usr/share/rsync",
})
}
}

func (tc *Client) reconcileServiceAccount(ctx context.Context, c client.Client, namespace string) error {
sa := &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", rsyncServiceAccount, tc.nameSuffix),
Namespace: namespace,
},
}
_, err := ctrlutil.CreateOrUpdate(ctx, c, sa, func() error {
sa.Labels = tc.labels
sa.OwnerReferences = tc.ownerRefs
return nil
})
return err
}

func (tc *Client) reconcileRole(ctx context.Context, c client.Client, namespace string) error {
role := &rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", rsyncRole, tc.nameSuffix),
Namespace: namespace,
},
}
_, err := ctrlutil.CreateOrUpdate(ctx, c, role, func() error {
role.OwnerReferences = tc.ownerRefs
role.Labels = tc.labels
role.Rules = []rbacv1.PolicyRule{
{
APIGroups: []string{"security.openshift.io"},
Resources: []string{"securitycontextconstraints"},
// Must match the name of the SCC that is deployed w/ the operator
// config/openshift/mover_scc.yaml
ResourceNames: []string{DefaultSCCName},
Verbs: []string{"use"},
},
}
return nil
})
return err
}

func (tc *Client) reconcileRoleBinding(ctx context.Context, c client.Client, namespace string) error {
roleBinding := &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", rsyncRoleBinding, tc.nameSuffix),
Namespace: namespace,
},
}
_, err := ctrlutil.CreateOrUpdate(ctx, c, roleBinding, func() error {
roleBinding.OwnerReferences = tc.ownerRefs
roleBinding.Labels = tc.labels
roleBinding.RoleRef = rbacv1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: "Role",
Name: fmt.Sprintf("%s-%s", rsyncRole, tc.nameSuffix),
}
roleBinding.Subjects = []rbacv1.Subject{
{Kind: "ServiceAccount", Name: fmt.Sprintf("%s-%s", rsyncServiceAccount, tc.nameSuffix)},
}
return nil
})
return err
}
Loading

0 comments on commit fdf0c4b

Please sign in to comment.