Skip to content

Commit

Permalink
Merge pull request #253 from grycap/dev-slangarita1
Browse files Browse the repository at this point in the history
Refactor of listing services and Interlink features
  • Loading branch information
catttam authored Sep 5, 2024
2 parents 628ec37 + 62629d6 commit 1c9e3df
Show file tree
Hide file tree
Showing 14 changed files with 208 additions and 152 deletions.
48 changes: 29 additions & 19 deletions pkg/backends/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,21 @@ func (k *KubeBackend) GetInfo() *types.ServerlessBackendInfo {

// ListServices returns a slice with all services registered in the provided namespace
func (k *KubeBackend) ListServices() ([]*types.Service, error) {
// Get the list with all podTemplates
podTemplates, err := k.kubeClientset.CoreV1().PodTemplates(k.namespace).List(context.TODO(), metav1.ListOptions{})
// Get the list with all Knative services
configmaps, err := getAllServicesConfigMaps(k.namespace, k.kubeClientset)
if err != nil {
log.Printf("WARNING: %v\n", err)
return nil, err
}

services := []*types.Service{}
for _, podTemplate := range podTemplates.Items {
// Get service from configMap's FDL
svc, err := getServiceFromFDL(podTemplate.Name, k.namespace, k.kubeClientset)

for _, cm := range configmaps.Items {
service, err := getServiceFromConfigMap(&cm)
if err != nil {
log.Printf("WARNING: %v\n", err)
} else {
services = append(services, svc)
return nil, err
}
services = append(services, service)
}

return services, nil
}

Expand Down Expand Up @@ -148,8 +146,14 @@ func (k *KubeBackend) ReadService(name string) (*types.Service, error) {
return nil, err
}

// Get the configMap of the Service
cm, err := k.kubeClientset.CoreV1().ConfigMaps(k.namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("the service \"%s\" does not have a registered ConfigMap", name)
}

// Get service from configMap's FDL
svc, err := getServiceFromFDL(name, k.namespace, k.kubeClientset)
svc, err := getServiceFromConfigMap(cm)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -242,17 +246,12 @@ func (k *KubeBackend) DeleteService(service types.Service) error {
return nil
}

func getServiceFromFDL(name string, namespace string, kubeClientset kubernetes.Interface) (*types.Service, error) {
// Get the configMap of the Service
cm, err := kubeClientset.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("the service \"%s\" does not have a registered ConfigMap", name)
}
func getServiceFromConfigMap(cm *v1.ConfigMap) (*types.Service, error) {
service := &types.Service{}

// Unmarshal the FDL stored in the configMap
if err = yaml.Unmarshal([]byte(cm.Data[types.FDLFileName]), service); err != nil {
return nil, fmt.Errorf("the FDL of the service \"%s\" cannot be read", name)
if err := yaml.Unmarshal([]byte(cm.Data[types.FDLFileName]), service); err != nil {
return nil, fmt.Errorf("the FDL of the service \"%s\" cannot be read", cm.Name)
}

// Add the script to the service from configmap's script value
Expand Down Expand Up @@ -364,6 +363,17 @@ func deleteServiceConfigMap(name string, namespace string, kubeClientset kuberne
return nil
}

func getAllServicesConfigMaps(namespace string, kubeClientset kubernetes.Interface) (*v1.ConfigMapList, error) {
listOpts := metav1.ListOptions{
LabelSelector: "oscar_service",
}
configMapsList, err := kubeClientset.CoreV1().ConfigMaps(namespace).List(context.TODO(), listOpts)
if err != nil {
return nil, err
}
return configMapsList, nil
}

func deleteServiceJobs(name string, namespace string, kubeClientset kubernetes.Interface) error {
// ListOptions to select all the associated jobs with the specified service
listOpts := metav1.ListOptions{
Expand Down
38 changes: 0 additions & 38 deletions pkg/backends/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,29 +113,12 @@ func TestKubeGetInfo(t *testing.T) {
}

func TestKubeListServices(t *testing.T) {
validPodTemplateListReactor := func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
podTemplateList := &v1.PodTemplateList{
Items: []v1.PodTemplate{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "testnamespace",
},
Template: v1.PodTemplateSpec{},
},
},
}
return true, podTemplateList, nil
}

t.Run("valid list", func(t *testing.T) {
clientset := fake.NewSimpleClientset()

back := MakeKubeBackend(clientset, testConfig)

// Return a valid PodTemplateList
back.kubeClientset.(*fake.Clientset).Fake.PrependReactor("list", "podtemplates", validPodTemplateListReactor)

// Return a valid configMap
back.kubeClientset.(*fake.Clientset).Fake.PrependReactor("get", "configmaps", validConfigMapReaction)

Expand All @@ -146,29 +129,11 @@ func TestKubeListServices(t *testing.T) {
}
})

t.Run("listing podTemplates throws an error", func(t *testing.T) {
clientset := fake.NewSimpleClientset()

back := MakeKubeBackend(clientset, testConfig)

// Return an error listing PodTemplates
back.kubeClientset.(*fake.Clientset).Fake.PrependReactor("list", "podtemplates", errorReaction)

// Call
_, err := back.ListServices()
if err == nil {
t.Error("expecting error, got: nil")
}
})

t.Run("getServiceFromFDL throws error getting configMap", func(t *testing.T) {
clientset := fake.NewSimpleClientset()

back := MakeKubeBackend(clientset, testConfig)

// Return a valid PodTemplateList
back.kubeClientset.(*fake.Clientset).Fake.PrependReactor("list", "podtemplates", validPodTemplateListReactor)

// Return an error getting the configMap
back.kubeClientset.(*fake.Clientset).Fake.PrependReactor("get", "configmaps", errorReaction)

Expand Down Expand Up @@ -198,9 +163,6 @@ func TestKubeListServices(t *testing.T) {
return true, validCM, nil
}

// Return a valid PodTemplateList
back.kubeClientset.(*fake.Clientset).Fake.PrependReactor("list", "podtemplates", validPodTemplateListReactor)

// Return a valid configMap with invalid FDL
back.kubeClientset.(*fake.Clientset).Fake.PrependReactor("get", "configmaps", validConfigMapWithInvalidFDLReactor)

Expand Down
23 changes: 13 additions & 10 deletions pkg/backends/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,20 @@ func (kn *KnativeBackend) GetInfo() *types.ServerlessBackendInfo {
// ListServices returns a slice with all services registered in the provided namespace
func (kn *KnativeBackend) ListServices() ([]*types.Service, error) {
// Get the list with all Knative services
knSvcs, err := kn.knClientset.ServingV1().Services(kn.namespace).List(context.TODO(), metav1.ListOptions{})
configmaps, err := getAllServicesConfigMaps(kn.namespace, kn.kubeClientset)
if err != nil {
log.Printf("WARNING: %v\n", err)
return nil, err
}

services := []*types.Service{}
for _, knSvc := range knSvcs.Items {
// Get service from configMap's FDL
svc, err := getServiceFromFDL(knSvc.Name, kn.namespace, kn.kubeClientset)

for _, cm := range configmaps.Items {
service, err := getServiceFromConfigMap(&cm)
if err != nil {
log.Printf("WARNING: %v\n", err)
} else {
services = append(services, svc)
return nil, err
}
services = append(services, service)
}

return services, nil
}

Expand Down Expand Up @@ -151,8 +149,13 @@ func (kn *KnativeBackend) ReadService(name string) (*types.Service, error) {
return nil, err
}

// Get the configMap of the Service
cm, err := kn.kubeClientset.CoreV1().ConfigMaps(kn.namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("the service \"%s\" does not have a registered ConfigMap", name)
}
// Get service from configMap's FDL
svc, err := getServiceFromFDL(name, kn.namespace, kn.kubeClientset)
svc, err := getServiceFromConfigMap(cm)
if err != nil {
return nil, err
}
Expand Down
11 changes: 0 additions & 11 deletions pkg/backends/knative_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,6 @@ func TestKnativeListServices(t *testing.T) {
[]k8stesting.SimpleReactor{knServiceListReactor},
false,
},
{
"Error listing knative services",
[]k8stesting.SimpleReactor{},
[]k8stesting.SimpleReactor{
{
Verb: "list",
Resource: "services",
Reaction: errorReaction,
}},
true,
},
{
"Error getting the configMap",
[]k8stesting.SimpleReactor{
Expand Down
24 changes: 14 additions & 10 deletions pkg/backends/openfaas.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,20 @@ func (of *OpenfaasBackend) GetInfo() *types.ServerlessBackendInfo {

// ListServices returns a slice with all services registered in the provided namespace
func (of *OpenfaasBackend) ListServices() ([]*types.Service, error) {
// Get the list with all deployments
deployments, err := of.kubeClientset.AppsV1().Deployments(of.namespace).List(context.TODO(), metav1.ListOptions{})
// Get the list with all Knative services
configmaps, err := getAllServicesConfigMaps(of.namespace, of.kubeClientset)
if err != nil {
log.Printf("WARNING: %v\n", err)
return nil, err
}

services := []*types.Service{}
for _, deployment := range deployments.Items {
// Get service from configMap's FDL
svc, err := getServiceFromFDL(deployment.Name, of.namespace, of.kubeClientset)

for _, cm := range configmaps.Items {
service, err := getServiceFromConfigMap(&cm)
if err != nil {
log.Printf("WARNING: %v\n", err)
} else {
services = append(services, svc)
return nil, err
}
services = append(services, service)
}

return services, nil
Expand Down Expand Up @@ -230,8 +229,13 @@ func (of *OpenfaasBackend) ReadService(name string) (*types.Service, error) {
return nil, err
}

// Get the configMap of the Service
cm, err := of.kubeClientset.CoreV1().ConfigMaps(of.namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("the service \"%s\" does not have a registered ConfigMap", name)
}
// Get service from configMap's FDL
svc, err := getServiceFromFDL(name, of.namespace, of.kubeClientset)
svc, err := getServiceFromConfigMap(cm)
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/handlers/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
minIOAdminClient, _ := utils.MakeMinIOAdminClient(cfg)

// Service is created by an EGI user

if !isAdminUser {
uid, err := auth.GetUIDFromContext(c)
if err != nil {
c.String(http.StatusInternalServerError, fmt.Sprintln(err))
return
}

// Set UID from owner
Expand All @@ -83,6 +85,7 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
mc, err := auth.GetMultitenancyConfigFromContext(c)
if err != nil {
c.String(http.StatusInternalServerError, fmt.Sprintln(err))
return
}

full_uid := auth.FormatUID(uid)
Expand All @@ -93,6 +96,7 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
err := checkIdentity(&service, cfg, authHeader)
if err != nil {
c.String(http.StatusBadRequest, fmt.Sprintln(err))
return
}
break
}
Expand Down
42 changes: 14 additions & 28 deletions pkg/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package handlers

import (
"context"
"encoding/base64"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -99,33 +98,24 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back
return
}

// Make event envVar
// Initialize event envVar and args var
event := v1.EnvVar{}
var args []string

var args string
if cfg.InterLinkAvailable && service.InterLinkNodeName != "" {
event = v1.EnvVar{
Name: types.EventVariable,
Value: base64.StdEncoding.EncodeToString([]byte(eventBytes)),
}
args = fmt.Sprintf("\" wget %s -O %s && chmod 0755 %s && echo \\$%s | base64 -d | %s \"", cfg.SupervisorURL, SupervisorPath, SupervisorPath, types.EventVariable, SupervisorPath)
podSpec.NodeSelector = map[string]string{
NodeSelectorKey: service.InterLinkNodeName,
}
podSpec.DNSPolicy = InterLinkDNSPolicy
podSpec.RestartPolicy = InterLinkRestartPolicy
podSpec.Tolerations = []v1.Toleration{
{
Key: InterLinkTolerationKey,
Operator: InterLinkTolerationOperator,
},
}
command, event, args = types.SetInterlinkJob(podSpec, service, cfg, eventBytes)
} else {
event = v1.EnvVar{
Name: types.EventVariable,
Value: string(eventBytes),

if service.Mount.Provider != "" {
args = []string{"-c", fmt.Sprintf("echo $%s | %s", types.EventVariable, service.GetSupervisorPath()) + ";echo \"I finish\" > /tmpfolder/finish-file;"}
types.SetMount(podSpec, *service, cfg)
} else {
event = v1.EnvVar{
Name: types.EventVariable,
Value: string(eventBytes),
}
args = []string{"-c", fmt.Sprintf("echo $%s | %s", types.EventVariable, service.GetSupervisorPath())}
}
args = fmt.Sprintf("echo $%s | %s", types.EventVariable, service.GetSupervisorPath())
}

// Make JOB_UUID envVar
Expand All @@ -150,16 +140,12 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back
for i, c := range podSpec.Containers {
if c.Name == types.ContainerName {
podSpec.Containers[i].Command = command
podSpec.Containers[i].Args = []string{"-c", args}
podSpec.Containers[i].Args = args
podSpec.Containers[i].Env = append(podSpec.Containers[i].Env, event)
podSpec.Containers[i].Env = append(podSpec.Containers[i].Env, jobUUIDVar)
podSpec.Containers[i].Env = append(podSpec.Containers[i].Env, resourceIDVar)
}
}
if service.Mount.Provider != "" {
types.SetMount(podSpec, *service, cfg)
podSpec.Containers[0].Args = []string{"-c", args + ";echo \"I finish\" > /tmpfolder/finish-file;"}
}

// Delegate job if can't be scheduled and has defined replicas
if rm != nil && service.HasReplicas() {
Expand Down
11 changes: 3 additions & 8 deletions pkg/handlers/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package handlers
import (
"fmt"
"net/http"
"slices"
"strings"

"github.com/gin-gonic/gin"
Expand All @@ -42,19 +43,13 @@ func MakeListHandler(back types.ServerlessBackend) gin.HandlerFunc {
uid, err := auth.GetUIDFromContext(c)
if err != nil {
c.String(http.StatusInternalServerError, fmt.Sprintln(err))
return
}

var allowedServicesForUser []*types.Service
for _, service := range services {
if len(service.AllowedUsers) == 0 {
if len(service.AllowedUsers) == 0 || slices.Contains(service.AllowedUsers, uid) {
allowedServicesForUser = append(allowedServicesForUser, service)
continue
}
for _, id := range service.AllowedUsers {
if uid == id {
allowedServicesForUser = append(allowedServicesForUser, service)
break
}
}
}

Expand Down
Loading

0 comments on commit 1c9e3df

Please sign in to comment.