Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 60 deletions.
54 changes: 33 additions & 21 deletions k8s/reconciler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,31 @@ import (
//counterfeiter:generate . TaskDesirer

type Task struct {
client TasksRuntimeClient
taskDesirer TaskDesirer
scheme *runtime.Scheme
logger lager.Logger
client TasksRuntimeClient
jobsClient JobsClient
scheme *runtime.Scheme
logger lager.Logger
}

//counterfeiter:generate . TasksRuntimeClient

type TasksRuntimeClient interface {
UpdateTaskStatus(context.Context, *eiriniv1.Task, eiriniv1.ExecutionStatus) error
UpdateTaskStatus(context.Context, *eiriniv1.Task, eiriniv1.TaskStatus) error
GetTask(context.Context, string, string) (*eiriniv1.Task, error)
GetJobForTask(context.Context, *eiriniv1.Task) (*batchv1.Job, error)
}

func NewTask(logger lager.Logger, client TasksRuntimeClient, taskDesirer TaskDesirer, scheme *runtime.Scheme) *Task {
return &Task{
client: client,
taskDesirer: taskDesirer,
scheme: scheme,
logger: logger,
}
type JobsClient interface {
Desire(ctx context.Context, namespace string, task *api.Task, opts ...shared.Option) error
}

type TaskDesirer interface {
Desire(ctx context.Context, namespace string, task *api.Task, opts ...shared.Option) error
func NewTask(logger lager.Logger, client TasksRuntimeClient, jobsClient JobsClient, scheme *runtime.Scheme) *Task {
return &Task{
client: client,
jobsClient: jobsClient,
scheme: scheme,
logger: logger,
}
}

func (t *Task) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
Expand All @@ -65,19 +65,21 @@ func (t *Task) Reconcile(ctx context.Context, request reconcile.Request) (reconc
return reconcile.Result{}, fmt.Errorf("could not fetch task: %w", err)
}

job, err := t.client.GetJobForTask(ctx, task)
job, err := t.jobsClient.GetByGUID(ctx, task.Spec.GUID)

if errors.IsNotFound(err) {
logger.Debug("job-not-found")

err = t.taskDesirer.Desire(ctx, task.Namespace, toAPITask(task), t.setOwnerFn(task))
err = t.jobsClient.Desire(ctx, task.Namespace, toAPITask(task), t.setOwnerFn(task))
if err != nil {
logger.Error("desire-task-failed", err)

return reconcile.Result{}, exterrors.Wrap(err, "failed to desire task")
}

if err := t.client.UpdateTaskStatus(ctx, task, eiriniv1.TaskStarting); err != nil {
if err := t.client.UpdateTaskStatus(ctx, task, eiriniv1.TaskStatus{
ExecutionStatus: eiriniv1.TaskStarting,
}); err != nil {
logger.Error("patch-status-failed", err)
}

Expand Down Expand Up @@ -106,15 +108,25 @@ func (t *Task) updateStatus(ctx context.Context, task *eiriniv1.Task, job *batch
return nil
}

if job.Status.Succeeded > 0 {
return t.client.UpdateTaskStatus(ctx, task, eiriniv1.TaskSucceeded)
if job.Status.Succeeded > 0 && job.Status.CompletionTime != nil {
return t.client.UpdateTaskStatus(ctx, task, eiriniv1.TaskStatus{
ExecutionStatus: eiriniv1.TaskSucceeded,
EndTime: job.Status.CompletionTime,
})
}

if job.Status.Failed > 0 {
return t.client.UpdateTaskStatus(ctx, task, eiriniv1.TaskFailed)
now := metav1.Now()
return t.client.UpdateTaskStatus(ctx, task, eiriniv1.TaskStatus{
ExecutionStatus: eiriniv1.TaskFailed,
EndTime: &now,
})
}

return t.client.UpdateTaskStatus(ctx, task, eiriniv1.TaskRunning)
return t.client.UpdateTaskStatus(ctx, task, eiriniv1.TaskStatus{
ExecutionStatus: eiriniv1.TaskRunning,
StartTime: job.Status.StartTime,
})
}

func (t *Task) setOwnerFn(task *eiriniv1.Task) func(interface{}) error {
Expand Down
32 changes: 8 additions & 24 deletions k8s/runtimeclient/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package runtimeclient

import (
"context"
"fmt"

"code.cloudfoundry.org/eirini/k8s/utils"
eiriniv1 "code.cloudfoundry.org/eirini/pkg/apis/eirini/v1"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -29,28 +26,15 @@ func (t *Tasks) GetTask(ctx context.Context, namespace, name string) (*eiriniv1.
return task, err
}

func (t *Tasks) UpdateTaskStatus(ctx context.Context, task *eiriniv1.Task, newStatus eiriniv1.ExecutionStatus) error {
func (t *Tasks) UpdateTaskStatus(ctx context.Context, task *eiriniv1.Task, newStatus eiriniv1.TaskStatus) error {
newTask := task.DeepCopy()
newTask.Status.ExecutionStatus = newStatus
newTask.Status.ExecutionStatus = newStatus.ExecutionStatus
if newStatus.StartTime != nil {
newTask.Status.StartTime = newStatus.StartTime
}
if newStatus.EndTime != nil {
newTask.Status.EndTime = newStatus.EndTime
}

return t.controllerClient.Status().Patch(ctx, newTask, client.MergeFrom(task))
}

func (t *Tasks) GetJobForTask(ctx context.Context, task *eiriniv1.Task) (*batchv1.Job, error) {
job := &batchv1.Job{}
err := t.controllerClient.Get(ctx, types.NamespacedName{
Namespace: task.Namespace,
Name: jobName(task),
}, job)

return job, err
}

func jobName(task *eiriniv1.Task) string {
name := fmt.Sprintf("%s-%s", task.Spec.AppName, task.Spec.SpaceName)
name = utils.SanitizeName(name, task.Spec.GUID)
if task.Name != "" {
name = fmt.Sprintf("%s-%s", name, task.Spec.Name)
}
return utils.SanitizeNameWithMaxStringLen(name, task.Spec.GUID, 50)
}
2 changes: 1 addition & 1 deletion k8s/stset/desire.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewDesirer(
}
}

func (d *Desirer) Desire(ctx context.Context, namespace string, lrp *api.LRP, opts ...shared.Option) error {
func (d *Desirer) Desire(ctx context.Context, namespace string, lrp *eiriniv1.LRP, opts ...shared.Option) error {
logger := d.logger.Session("desire", lager.Data{"guid": lrp.GUID, "version": lrp.Version, "namespace": namespace})

statefulSetName, err := utils.GetStatefulsetName(lrp)
Expand Down
1 change: 1 addition & 0 deletions k8s/task_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type TaskClient struct {
jobs.Getter
jobs.Deleter
jobs.Lister
jobs.Statuser
}

func NewTaskClient(
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/eirini/v1/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const (
)

type TaskStatus struct {
StartTime meta_v1.Time `json:"start_time"`
EndTime meta_v1.Time `json:"end_time"`
StartTime *meta_v1.Time `json:"start_time"`
EndTime *meta_v1.Time `json:"end_time"`
ExecutionStatus ExecutionStatus `json:"execution_status"`
}
10 changes: 8 additions & 2 deletions pkg/apis/eirini/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 24 additions & 10 deletions tests/eats/tasks_crd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"code.cloudfoundry.org/eirini/tests"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gstruct"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = FDescribe("Tasks CRD [needs-logs-for: eirini-controller]", func() {
var _ = Describe("Tasks CRD [needs-logs-for: eirini-controller]", func() {
var (
task *eiriniv1.Task
taskName string
Expand Down Expand Up @@ -48,19 +49,19 @@ var _ = FDescribe("Tasks CRD [needs-logs-for: eirini-controller]", func() {
ctx = context.Background()
})

getExecutionStatus := func() (eiriniv1.ExecutionStatus, error) {
getTaskStatus := func() (eiriniv1.TaskStatus, error) {
runningTask, err := fixture.EiriniClientset.
EiriniV1().
Tasks(fixture.Namespace).
Get(ctx, taskName, metav1.GetOptions{})
if err != nil {
return "", err
return eiriniv1.TaskStatus{}, err
}

return runningTask.Status.ExecutionStatus, nil
return runningTask.Status, nil
}

FDescribe("Creating a Task CRD", func() {
Describe("Creating a Task CRD", func() {
JustBeforeEach(func() {
_, err := fixture.EiriniClientset.
EiriniV1().
Expand All @@ -78,13 +79,18 @@ var _ = FDescribe("Tasks CRD [needs-logs-for: eirini-controller]", func() {
})

It("sets the task execution status to starting", func() {
Eventually(getExecutionStatus).Should(Equal(eiriniv1.TaskStarting))
Eventually(getTaskStatus).Should(MatchFields(IgnoreExtras, Fields{
"ExecutionStatus": Equal(eiriniv1.TaskStarting),
}))
})
})

It("runs the task", func() {
Eventually(tests.RequestServiceFn(fixture.Namespace, taskServiceName, port, "/")).Should(ContainSubstring("Dora!"))
Eventually(getExecutionStatus).Should(Equal(eiriniv1.TaskRunning))
Eventually(getTaskStatus).Should(MatchFields(IgnoreExtras, Fields{
"ExecutionStatus": Equal(eiriniv1.TaskRunning),
"StartTime": Not(BeZero()),
}))
})

When("the task image lives in a private registry", func() {
Expand All @@ -109,7 +115,11 @@ var _ = FDescribe("Tasks CRD [needs-logs-for: eirini-controller]", func() {
})

It("marks the Task as succeeded", func() {
Eventually(getExecutionStatus).Should(Equal(eiriniv1.TaskSucceeded))
Eventually(getTaskStatus).Should(MatchFields(IgnoreExtras, Fields{
"ExecutionStatus": Equal(eiriniv1.TaskSucceeded),
"StartTime": Not(BeZero()),
"EndTime": Not(BeZero()),
}))
})
})

Expand All @@ -119,8 +129,12 @@ var _ = FDescribe("Tasks CRD [needs-logs-for: eirini-controller]", func() {
task.Spec.Command = []string{"false"}
})

It("marks the Task as succeeded", func() {
Eventually(getExecutionStatus).Should(Equal(eiriniv1.TaskFailed))
It("marks the Task as failed", func() {
Eventually(getTaskStatus).Should(MatchFields(IgnoreExtras, Fields{
"ExecutionStatus": Equal(eiriniv1.TaskFailed),
"StartTime": Not(BeZero()),
"EndTime": Not(BeZero()),
}))
})
})
})
Expand Down
7 changes: 7 additions & 0 deletions tests/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"sync"

eiriniclient "code.cloudfoundry.org/eirini/pkg/generated/clientset/versioned"
eirinischeme "code.cloudfoundry.org/eirini/pkg/generated/clientset/versioned/scheme"
"github.com/hashicorp/go-multierror"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"

// nolint:golint,stylecheck,revive
. "github.com/onsi/ginkgo"
Expand All @@ -33,6 +35,7 @@ const (
type Fixture struct {
Clientset kubernetes.Interface
EiriniClientset eiriniclient.Interface
RuntimeClient runtimeclient.Client
Namespace string
PspName string
KubeConfigPath string
Expand Down Expand Up @@ -75,10 +78,14 @@ func NewFixture(writer io.Writer) *Fixture {
lrpclientset, err := eiriniclient.NewForConfig(config)
Expect(err).NotTo(HaveOccurred(), "failed to create clientset")

runtimeClient, err := runtimeclient.New(config, runtimeclient.Options{Scheme: eirinischeme.Scheme})
Expect(err).NotTo(HaveOccurred(), "failed to create runtime client")

return &Fixture{
KubeConfigPath: kubeConfigPath,
Clientset: clientset,
EiriniClientset: lrpclientset,
RuntimeClient: runtimeClient,
Writer: writer,
nextAvailablePort: basePortNumber + portRange*GinkgoParallelNode(),
portMux: &sync.Mutex{},
Expand Down
57 changes: 57 additions & 0 deletions tests/integration/runtimeclient/runtimeclient_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package integration_test

import (
"context"
"testing"
"time"

eiriniv1 "code.cloudfoundry.org/eirini/pkg/apis/eirini/v1"
"code.cloudfoundry.org/eirini/tests"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
_ "k8s.io/client-go/plugin/pkg/client/auth"
)

var (
fixture *tests.Fixture
ctx context.Context
)

var _ = BeforeSuite(func() {
fixture = tests.NewFixture(GinkgoWriter)
})

var _ = BeforeEach(func() {
fixture.SetUp()
ctx = context.Background()
})

var _ = AfterEach(func() {
fixture.TearDown()
})

var _ = AfterSuite(func() {
fixture.Destroy()
})

func TestRuntimeclient(t *testing.T) {
SetDefaultEventuallyTimeout(4 * time.Minute)
RegisterFailHandler(Fail)
RunSpecs(t, "Runtime Client")
}

func createTask(ns, name string) *eiriniv1.Task {
task := &eiriniv1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: eiriniv1.TaskSpec{
Command: []string{"echo"},
},
}
task, err := fixture.EiriniClientset.EiriniV1().Tasks(ns).Create(context.Background(), task, metav1.CreateOptions{})

Expect(err).NotTo(HaveOccurred())
return task
}
Loading

0 comments on commit f01ae16

Please sign in to comment.