Skip to content

Commit

Permalink
add NewServerWithStunnelRoute to rsync/server.go
Browse files Browse the repository at this point in the history
Signed-off-by: Alay Patel <[email protected]>
  • Loading branch information
alaypatel07 committed Feb 20, 2022
1 parent e737951 commit 5b5c837
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 44 deletions.
74 changes: 57 additions & 17 deletions transfer/rsync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ import (
"bytes"
"context"
"fmt"
"github.com/go-logr/logr"
"strconv"
"text/template"

"github.com/backube/pvc-transfer/endpoint"
"github.com/backube/pvc-transfer/endpoint/route"
"github.com/backube/pvc-transfer/internal/utils"
"github.com/backube/pvc-transfer/transfer"
"github.com/backube/pvc-transfer/transport"
"github.com/backube/pvc-transfer/transport/stunnel"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"k8s.io/apimachinery/pkg/types"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
ctrlutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

Expand All @@ -37,8 +39,8 @@ func AddToScheme(scheme *runtime.Scheme) error {

// APIsToWatch give a list of APIs to watch if using this package
// to deploy the endpoint
func APIsToWatch() ([]client.Object, error) {
return []client.Object{
func APIsToWatch() ([]ctrlclient.Object, error) {
return []ctrlclient.Object{
&corev1.Secret{},
&corev1.ConfigMap{},
&corev1.Pod{},
Expand Down Expand Up @@ -85,7 +87,7 @@ type rsyncConfigData struct {
AllowLocalhostOnly bool
}

type reconcileFunc func(ctx context.Context, c client.Client, namespace string) error
type reconcileFunc func(ctx context.Context, c ctrlclient.Client, namespace string) error

type server struct {
username string
Expand All @@ -111,17 +113,17 @@ func (s *server) Transport() transport.Transport {
return s.transportServer
}

func (s *server) IsHealthy(ctx context.Context, c client.Client) (bool, error) {
return transfer.IsPodHealthy(ctx, c, client.ObjectKey{Namespace: s.pvcList.Namespaces()[0], Name: fmt.Sprintf("rsync-server-%s", s.nameSuffix)})
func (s *server) IsHealthy(ctx context.Context, c ctrlclient.Client) (bool, error) {
return transfer.IsPodHealthy(ctx, c, ctrlclient.ObjectKey{Namespace: s.pvcList.Namespaces()[0], Name: fmt.Sprintf("rsync-server-%s", s.nameSuffix)})
}

func (s *server) Completed(ctx context.Context, c client.Client) (bool, error) {
return transfer.IsPodCompleted(ctx, c, client.ObjectKey{Namespace: s.pvcList.Namespaces()[0], Name: fmt.Sprintf("rsync-server-%s", s.nameSuffix)}, "rsync")
func (s *server) Completed(ctx context.Context, c ctrlclient.Client) (bool, error) {
return transfer.IsPodCompleted(ctx, c, ctrlclient.ObjectKey{Namespace: s.pvcList.Namespaces()[0], Name: fmt.Sprintf("rsync-server-%s", s.nameSuffix)}, "rsync")
}

// MarkForCleanup marks the provided "obj" to be deleted at the end of the
// synchronization iteration.
func (s *server) MarkForCleanup(ctx context.Context, c client.Client, key, value string) error {
func (s *server) MarkForCleanup(ctx context.Context, c ctrlclient.Client, key, value string) error {
// mark endpoint for deletion
err := s.Endpoint().MarkForCleanup(ctx, c, key, value)
if err != nil {
Expand Down Expand Up @@ -216,14 +218,52 @@ func (s *server) ListenPort() int32 {
return s.listenPort
}

func NewServerWithStunnelRoute(ctx context.Context, c ctrlclient.Client, logger logr.Logger,
pvcList transfer.PVCList,
labels map[string]string,
ownerRefs []metav1.OwnerReference,
password string, podOptions transfer.PodOptions) (transfer.Server, error) {

var namespace string
namespaces := pvcList.Namespaces()
if len(namespaces) > 0 {
namespace = pvcList.Namespaces()[0]
}

for _, ns := range namespaces {
if ns != namespace {
return nil, fmt.Errorf("PVC list provided has pvcs in different namespaces which is not supported")
}
}

if namespace == "" {
return nil, fmt.Errorf("ether PVC list is empty or namespace is not specified")
}
hm := transfer.NamespaceHashForNames(pvcList)
e, err := route.New(ctx, c, logger, types.NamespacedName{
Namespace: namespace,
Name: hm[namespace],
}, route.EndpointTypePassthrough, labels, ownerRefs)
if err != nil {
return nil, err
}

t, err := stunnel.NewServer(ctx, c, logger, types.NamespacedName{Namespace: namespace, Name: hm[namespace]}, e, &transport.Options{Labels: labels, Owners: ownerRefs})
if err != nil {
return nil, err
}

return NewServer(ctx, c, logger, pvcList, t, e, labels, ownerRefs, password, podOptions)
}

// NewServer takes PVCList, transport and endpoint object and all
// the resources required by the transfer server pod as well as the transfer
// pod. All the PVCs in the list can be sync'ed via the endpoint object

// In order to generate the right RBAC, add the following lines to the Reconcile function annotations.
// +kubebuilder:rbac:groups=core,resources=secrets;configmaps;pods;serviceaccounts,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=roles;rolebindings,verbs=get;list;watch;create;update;patch;delete
func NewServer(ctx context.Context, c client.Client, logger logr.Logger,
func NewServer(ctx context.Context, c ctrlclient.Client, logger logr.Logger,
pvcList transfer.PVCList,
t transport.Transport,
e endpoint.Endpoint,
Expand Down Expand Up @@ -287,7 +327,7 @@ func NewServer(ctx context.Context, c client.Client, logger logr.Logger,
return r, nil
}

func (s *server) reconcileConfigMap(ctx context.Context, c client.Client, namespace string) error {
func (s *server) reconcileConfigMap(ctx context.Context, c ctrlclient.Client, namespace string) error {
var rsyncConf bytes.Buffer
rsyncConfTemplate, err := template.New("config").Parse(rsyncServerConfTemplate)
if err != nil {
Expand Down Expand Up @@ -326,7 +366,7 @@ func (s *server) reconcileConfigMap(ctx context.Context, c client.Client, namesp
return err
}

func (s *server) reconcileSecret(ctx context.Context, c client.Client, namespace string) error {
func (s *server) reconcileSecret(ctx context.Context, c ctrlclient.Client, namespace string) error {
if s.password == "" {
e := fmt.Errorf("password is empty")
s.logger.Error(e, "unable to find password for rsyncServer")
Expand All @@ -352,7 +392,7 @@ func (s *server) reconcileSecret(ctx context.Context, c client.Client, namespace
return err
}

func (s *server) reconcilePod(ctx context.Context, c client.Client, namespace string) error {
func (s *server) reconcilePod(ctx context.Context, c ctrlclient.Client, namespace string) error {
volumeMounts := []corev1.VolumeMount{}
configVolumeMounts := s.getConfigVolumeMounts()
pvcVolumeMounts := s.getPVCVolumeMounts(namespace)
Expand Down Expand Up @@ -514,7 +554,7 @@ func (s *server) getConfigVolumeMounts() []corev1.VolumeMount {
}
}

func (s *server) reconcileServiceAccount(ctx context.Context, c client.Client, namespace string) error {
func (s *server) reconcileServiceAccount(ctx context.Context, c ctrlclient.Client, namespace string) error {
sa := &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", rsyncServiceAccount, s.nameSuffix),
Expand All @@ -529,7 +569,7 @@ func (s *server) reconcileServiceAccount(ctx context.Context, c client.Client, n
return err
}

func (s *server) reconcileRole(ctx context.Context, c client.Client, namespace string) error {
func (s *server) reconcileRole(ctx context.Context, c ctrlclient.Client, namespace string) error {
role := &rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", rsyncRole, s.nameSuffix),
Expand All @@ -554,7 +594,7 @@ func (s *server) reconcileRole(ctx context.Context, c client.Client, namespace s
return err
}

func (s *server) reconcileRoleBinding(ctx context.Context, c client.Client, namespace string) error {
func (s *server) reconcileRoleBinding(ctx context.Context, c ctrlclient.Client, namespace string) error {
roleBinding := &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", rsyncRoleBinding, s.nameSuffix),
Expand Down
Loading

0 comments on commit 5b5c837

Please sign in to comment.