-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Alay Patel <[email protected]>
- Loading branch information
1 parent
7052d05
commit 9267845
Showing
2 changed files
with
566 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,353 @@ | ||
package rsync | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strings" | ||
|
||
"github.com/backube/pvc-transfer/endpoint" | ||
"github.com/backube/pvc-transfer/internal/utils" | ||
"github.com/backube/pvc-transfer/transfer" | ||
"github.com/backube/pvc-transfer/transport" | ||
"github.com/backube/pvc-transfer/transport/stunnel" | ||
corev1 "k8s.io/api/core/v1" | ||
rbacv1 "k8s.io/api/rbac/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
errorsutil "k8s.io/apimachinery/pkg/util/errors" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
ctrlutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" | ||
) | ||
|
||
type Client struct { | ||
pvcList transfer.PVCList | ||
|
||
username string | ||
password string | ||
transportClient transport.Transport | ||
endpoint endpoint.Endpoint | ||
|
||
labels map[string]string | ||
ownerRefs []metav1.OwnerReference | ||
options transfer.PodOptions | ||
nameSuffix string | ||
} | ||
|
||
func (tc *Client) Transport() transport.Transport { | ||
return tc.transportClient | ||
} | ||
|
||
func (tc *Client) PVCs() []*corev1.PersistentVolumeClaim { | ||
pvcs := []*corev1.PersistentVolumeClaim{} | ||
for _, pvc := range tc.pvcList.PVCs() { | ||
pvcs = append(pvcs, pvc.Claim()) | ||
} | ||
return pvcs | ||
} | ||
|
||
func (tc *Client) Status(ctx context.Context, c client.Client) (*transfer.Status, error) { | ||
podList := &corev1.PodList{} | ||
err := c.List(ctx, podList, client.MatchingLabels(tc.labels)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
for _, pod := range podList.Items { | ||
if pod.Status.ContainerStatuses != nil || len(pod.Status.ContainerStatuses) > 0 { | ||
for _, containerStatus := range pod.Status.ContainerStatuses { | ||
if containerStatus.Name == "rsync" && containerStatus.State.Terminated != nil { | ||
if containerStatus.State.Terminated.ExitCode == 0 { | ||
return &transfer.Status{ | ||
Completed: &transfer.Completed{ | ||
Successful: true, | ||
Failure: false, | ||
FinishedAt: &containerStatus.State.Terminated.FinishedAt, | ||
}, | ||
}, nil | ||
} else { | ||
return &transfer.Status{ | ||
Running: nil, | ||
Completed: &transfer.Completed{ | ||
Successful: false, | ||
Failure: true, | ||
FinishedAt: &containerStatus.State.Terminated.FinishedAt, | ||
}, | ||
}, nil | ||
} | ||
} | ||
} | ||
} | ||
} | ||
return nil, fmt.Errorf("unable to find the appropriate container to inspect status for rsync transfer") | ||
} | ||
|
||
func (tc *Client) MarkForCleanup(ctx context.Context, c client.Client, key, value string) error { | ||
err := tc.Transport().MarkForCleanup(ctx, c, key, value) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
podList := &corev1.PodList{} | ||
err = c.List(ctx, podList, client.MatchingLabels(tc.labels)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, p := range podList.Items { | ||
if err := utils.UpdateWithLabel(ctx, c, &p, key, value); err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func NewRsyncTransferClient(ctx context.Context, c client.Client, | ||
pvcList transfer.PVCList, | ||
t transport.Transport, | ||
e endpoint.Endpoint, | ||
labels map[string]string, | ||
ownerRefs []metav1.OwnerReference, | ||
password string, podOptions transfer.PodOptions) (transfer.Client, error) { | ||
tc := &Client{ | ||
username: "root", | ||
password: password, | ||
pvcList: pvcList, | ||
transportClient: t, | ||
endpoint: e, | ||
labels: labels, | ||
ownerRefs: ownerRefs, | ||
options: podOptions, | ||
} | ||
|
||
var namespace string | ||
namespaces := pvcList.GetNamespaces() | ||
if len(namespaces) > 0 { | ||
namespace = pvcList.GetNamespaces()[0] | ||
} | ||
|
||
for _, ns := range namespaces { | ||
if ns != namespace { | ||
return nil, fmt.Errorf("PVC list provided has pvcs in different namespaces which is not supported") | ||
} | ||
} | ||
|
||
if namespace == "" { | ||
return nil, fmt.Errorf("ether PVC list is empty or namespace is not specified") | ||
} | ||
|
||
tc.nameSuffix = transfer.GetNames(pvcList)[namespace][:10] | ||
err := tc.createRsyncClient(ctx, c, pvcList.GetNamespaces()[0]) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return tc, nil | ||
} | ||
|
||
func (tc *Client) createRsyncClient(ctx context.Context, c client.Client, ns string) error { | ||
var errs []error | ||
|
||
podList := &corev1.PodList{} | ||
err := c.List(context.Background(), podList, client.MatchingLabels(tc.labels)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if len(podList.Items) > 0 { | ||
return nil | ||
} | ||
|
||
rsyncOptions, err := GetRsyncCommandDefaultFlags() | ||
if err != nil { | ||
return err | ||
} | ||
for _, pvc := range tc.pvcList.InNamespace(ns).PVCs() { | ||
// create Rsync command for PVC | ||
rsyncContainerCommand := tc.getRsyncCommand(rsyncOptions, pvc) | ||
// create rsync container | ||
containers := []corev1.Container{ | ||
{ | ||
Name: RsyncContainer, | ||
Image: rsyncImage, | ||
Command: rsyncContainerCommand, | ||
Env: []corev1.EnvVar{ | ||
{ | ||
Name: "RSYNC_PASSWORD", | ||
Value: tc.password, | ||
}, | ||
}, | ||
|
||
VolumeMounts: []corev1.VolumeMount{ | ||
{ | ||
Name: "mnt", | ||
MountPath: fmt.Sprintf("/mnt/%s/%s", pvc.Claim().Namespace, pvc.LabelSafeName()), | ||
}, | ||
{ | ||
Name: "rsync-communication", | ||
MountPath: "/usr/share/rsync", | ||
}, | ||
}, | ||
}, | ||
} | ||
// attach transport containers | ||
customizeTransportClientContainers(tc.Transport()) | ||
containers = append(containers, tc.Transport().Containers()...) | ||
|
||
volumes := []corev1.Volume{ | ||
{ | ||
Name: "mnt", | ||
VolumeSource: corev1.VolumeSource{ | ||
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ | ||
ClaimName: pvc.Claim().Name, | ||
}, | ||
}, | ||
}, | ||
{ | ||
Name: "rsync-communication", | ||
VolumeSource: corev1.VolumeSource{ | ||
EmptyDir: &corev1.EmptyDirVolumeSource{Medium: corev1.StorageMediumDefault}, | ||
}, | ||
}, | ||
} | ||
volumes = append(volumes, tc.Transport().Volumes()...) | ||
podSpec := corev1.PodSpec{ | ||
Containers: containers, | ||
Volumes: volumes, | ||
RestartPolicy: corev1.RestartPolicyNever, | ||
} | ||
|
||
applyPodOptions(&podSpec, tc.options) | ||
|
||
pod := corev1.Pod{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
GenerateName: "rsync-", | ||
Namespace: pvc.Claim().Namespace, | ||
Labels: tc.labels, | ||
OwnerReferences: tc.ownerRefs, | ||
}, | ||
Spec: podSpec, | ||
} | ||
|
||
err := c.Create(ctx, &pod, &client.CreateOptions{}) | ||
errs = append(errs, err) | ||
} | ||
|
||
return errorsutil.NewAggregate(errs) | ||
} | ||
|
||
func (tc *Client) getRsyncCommand(rsyncOptions []string, pvc transfer.PVC) []string { | ||
// TODO: add a stub for null transport | ||
rsyncCommand := []string{"/usr/bin/rsync"} | ||
rsyncCommand = append(rsyncCommand, rsyncOptions...) | ||
rsyncCommand = append(rsyncCommand, fmt.Sprintf("/mnt/%s/%s", pvc.Claim().Namespace, pvc.LabelSafeName())) | ||
rsyncCommand = append(rsyncCommand, | ||
fmt.Sprintf("rsync://%s@%s/%s --port %d", | ||
tc.username, | ||
tc.Transport().Hostname(), | ||
pvc.LabelSafeName(), tc.Transport().ListenPort())) | ||
rsyncCommandBashScript := fmt.Sprintf( | ||
"trap \"touch /usr/share/rsync/rsync-client-container-done\" EXIT SIGINT SIGTERM; timeout=120; SECONDS=0; while [ $SECONDS -lt $timeout ]; do nc -z localhost %d; rc=$?; if [ $rc -eq 0 ]; then %s; rc=$?; break; fi; done; exit $rc;", | ||
tc.Transport().ListenPort(), | ||
strings.Join(rsyncCommand, " ")) | ||
rsyncContainerCommand := []string{ | ||
"/bin/bash", | ||
"-c", | ||
rsyncCommandBashScript, | ||
} | ||
return rsyncContainerCommand | ||
} | ||
|
||
// customizeTransportClientContainers customizes transport's client containers for specific rsync communication | ||
func customizeTransportClientContainers(transportClient transport.Transport) { | ||
switch transportClient.Type() { | ||
case stunnel.TransportTypeStunnel: | ||
var stunnelContainer *corev1.Container | ||
for i := range transportClient.Containers() { | ||
c := &transportClient.Containers()[i] | ||
if c.Name == stunnel.Container { | ||
stunnelContainer = c | ||
} | ||
} | ||
stunnelContainer.Command = []string{ | ||
"/bin/bash", | ||
"-c", | ||
`/bin/stunnel /etc/stunnel/stunnel.conf | ||
while true | ||
do test -f /usr/share/rsync/rsync-client-container-done | ||
if [ $? -eq 0 ] | ||
then | ||
break | ||
fi | ||
done | ||
exit 0`, | ||
} | ||
stunnelContainer.VolumeMounts = append( | ||
stunnelContainer.VolumeMounts, | ||
corev1.VolumeMount{ | ||
Name: "rsync-communication", | ||
MountPath: "/usr/share/rsync", | ||
}) | ||
} | ||
} | ||
|
||
func (tc *Client) reconcileServiceAccount(ctx context.Context, c client.Client, namespace string) error { | ||
sa := &corev1.ServiceAccount{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: fmt.Sprintf("%s-%s", rsyncServiceAccount, tc.nameSuffix), | ||
Namespace: namespace, | ||
}, | ||
} | ||
_, err := ctrlutil.CreateOrUpdate(ctx, c, sa, func() error { | ||
sa.Labels = tc.labels | ||
sa.OwnerReferences = tc.ownerRefs | ||
return nil | ||
}) | ||
return err | ||
} | ||
|
||
func (tc *Client) reconcileRole(ctx context.Context, c client.Client, namespace string) error { | ||
role := &rbacv1.Role{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: fmt.Sprintf("%s-%s", rsyncRole, tc.nameSuffix), | ||
Namespace: namespace, | ||
}, | ||
} | ||
_, err := ctrlutil.CreateOrUpdate(ctx, c, role, func() error { | ||
role.OwnerReferences = tc.ownerRefs | ||
role.Labels = tc.labels | ||
role.Rules = []rbacv1.PolicyRule{ | ||
{ | ||
APIGroups: []string{"security.openshift.io"}, | ||
Resources: []string{"securitycontextconstraints"}, | ||
// Must match the name of the SCC that is deployed w/ the operator | ||
// config/openshift/mover_scc.yaml | ||
ResourceNames: []string{DefaultSCCName}, | ||
Verbs: []string{"use"}, | ||
}, | ||
} | ||
return nil | ||
}) | ||
return err | ||
} | ||
|
||
func (tc *Client) reconcileRoleBinding(ctx context.Context, c client.Client, namespace string) error { | ||
roleBinding := &rbacv1.RoleBinding{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: fmt.Sprintf("%s-%s", rsyncRoleBinding, tc.nameSuffix), | ||
Namespace: namespace, | ||
}, | ||
} | ||
_, err := ctrlutil.CreateOrUpdate(ctx, c, roleBinding, func() error { | ||
roleBinding.OwnerReferences = tc.ownerRefs | ||
roleBinding.Labels = tc.labels | ||
roleBinding.RoleRef = rbacv1.RoleRef{ | ||
APIGroup: "rbac.authorization.k8s.io", | ||
Kind: "Role", | ||
Name: fmt.Sprintf("%s-%s", rsyncRole, tc.nameSuffix), | ||
} | ||
roleBinding.Subjects = []rbacv1.Subject{ | ||
{Kind: "ServiceAccount", Name: fmt.Sprintf("%s-%s", rsyncServiceAccount, tc.nameSuffix)}, | ||
} | ||
return nil | ||
}) | ||
return err | ||
} |
Oops, something went wrong.