Skip to content

Commit

Permalink
fix cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
rangoo94 committed Sep 4, 2024
1 parent eefd01f commit 55065f5
Show file tree
Hide file tree
Showing 11 changed files with 8 additions and 161 deletions.
11 changes: 3 additions & 8 deletions cmd/tcl/testworkflow-toolkit/commands/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ func NewParallelCmd() *cobra.Command {

prevStatus := testkube.QUEUED_TestWorkflowStatus
prevStep := ""
prevIsFinished := false
scheduled := false
for v := range ctrl.WatchLightweight(ctx) {
// Handle error
Expand All @@ -289,18 +288,14 @@ func NewParallelCmd() *cobra.Command {
}

// Handle result change
// TODO: the final status should always have the finishedAt too,
// there should be no need for checking isFinished diff
if v.Status != prevStatus || lastResult.IsFinished() != prevIsFinished || v.Current != prevStep {
if v.Status != prevStatus || v.Current != prevStep {
if v.Status != prevStatus {
log(string(v.Status))
}
updates <- Update{index: index, result: v.Result}
prevStep = v.Current
prevStatus = v.Status
prevIsFinished = lastResult.IsFinished()

// TODO: Maybe wait until end of channel
if lastResult.IsFinished() {
instructions.PrintOutput(env.Ref(), "parallel", ParallelStatus{Index: int(index), Status: v.Status, Result: v.Result})
ctxCancel()
Expand Down Expand Up @@ -376,12 +371,12 @@ func NewParallelCmd() *cobra.Command {
return
}
}
spawn.CreateLogger("worker", descriptions[index], index, params.Count)("warning", "failed to to resume, retrying...", err.Error())
spawn.CreateLogger("worker", descriptions[index], index, params.Count)("warning", "failed to resume, retrying...", err.Error())
time.Sleep(300 * time.Millisecond)
}

// Total failure while retrying
spawn.CreateLogger("worker", descriptions[index], index, params.Count)("warning", "failed to to resume, maximum retries reached. aborting...", err.Error())
spawn.CreateLogger("worker", descriptions[index], index, params.Count)("warning", "failed to resume, maximum retries reached. aborting...", err.Error())
_ = ctrl.Abort(context.Background())
})
}
Expand Down
11 changes: 0 additions & 11 deletions cmd/tcl/testworkflow-toolkit/spawn/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,6 @@ func (r *registry) AllPaused() bool {
return true
}

func (r *registry) CountStatuses() map[testkube.TestWorkflowStatus]int {
r.mu.RLock()
defer r.mu.RUnlock()

statuses := map[testkube.TestWorkflowStatus]int{}
for _, u := range r.statuses {
statuses[u]++
}
return statuses
}

func (r *registry) Get(index int64) testworkflowcontroller.Controller {
r.mu.RLock()
defer r.mu.RUnlock()
Expand Down
4 changes: 0 additions & 4 deletions cmd/tcl/testworkflow-toolkit/spawn/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,6 @@ func ExecuteParallel[T any](run func(int64, *T) bool, items []T, parallelism int
}

func SaveLogsWithController(ctx context.Context, storage artifacts.InternalArtifactStorage, ctrl testworkflowcontroller.Controller, prefix string, index int64) (string, error) {
//fmt.Println("started streaming", ctrl.ResourceID())
//defer func() {
// fmt.Println("finished streaming", ctrl.ResourceID())
//}()
if ctrl == nil {
return "", errors.New("cannot control TestWorkflow's execution")
}
Expand Down
6 changes: 0 additions & 6 deletions cmd/testworkflow-toolkit/artifacts/internalartifactstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,6 @@ func (s *internalArtifactStorage) SaveStream(artifactPath string, stream io.Read
return err
}

//b, err := io.ReadAll(stream)
//if err != nil && !errors.Is(err, io.EOF) {
// return err
//}
//buf := bytes.NewBuffer(b)

size := -1
if streamL, ok := stream.(withLength); ok {
size = streamL.Len()
Expand Down
18 changes: 2 additions & 16 deletions pkg/testworkflows/testworkflowcontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ import (
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/stage"
)

const (
DefaultInitTimeout = 5 * time.Second
)

var (
ErrJobAborted = errors.New("job was aborted")
ErrJobTimeout = errors.New("timeout retrieving job")
Expand Down Expand Up @@ -187,7 +183,6 @@ func (c *controller) WatchLightweight(parentCtx context.Context) <-chan Lightwei
prevNodeName := ""
prevPodIP := ""
prevStatus := testkube.QUEUED_TestWorkflowStatus
prevIsFinished := false
sig := stage.MapSignatureListToInternal(c.signature)
ch := make(chan LightweightNotification)
go func() {
Expand All @@ -202,25 +197,20 @@ func (c *controller) WatchLightweight(parentCtx context.Context) <-chan Lightwei
podIP, _ := c.PodIP()
current := prevCurrent
status := prevStatus
isFinished := prevIsFinished
if v.Value.Result != nil {
if v.Value.Result.Status != nil {
status = *v.Value.Result.Status
} else {
status = testkube.QUEUED_TestWorkflowStatus
}
current = v.Value.Result.Current(sig)
isFinished = v.Value.Result.IsFinished()
}

// TODO: the final status should always have the finishedAt too,
// there should be no need for checking isFinished diff
if nodeName != prevNodeName || isFinished != prevIsFinished || podIP != prevPodIP || prevStatus != status || prevCurrent != current {
if nodeName != prevNodeName || podIP != prevPodIP || prevStatus != status || prevCurrent != current {
prevNodeName = nodeName
prevPodIP = podIP
prevStatus = status
prevCurrent = current
prevIsFinished = isFinished
ch <- LightweightNotification{NodeName: nodeName, PodIP: podIP, Status: status, Current: current, Result: v.Value.Result}
}
}
Expand All @@ -232,11 +222,7 @@ func (c *controller) WatchLightweight(parentCtx context.Context) <-chan Lightwei
func (c *controller) Logs(parentCtx context.Context, follow bool) io.Reader {
reader, writer := io.Pipe()
go func() {
defer func() {
//fmt.Println(c.id, "finishing streaming logs for")
writer.Close()
//fmt.Println(c.id, "finished streaming logs for")
}()
defer writer.Close()
ref := ""
ch, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.watcher, WatchInstrumentedPodOptions{
DisableFollow: !follow,
Expand Down
1 change: 0 additions & 1 deletion pkg/testworkflows/testworkflowcontroller/store/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func (u *updateImmediate) Channel(ctx context.Context) <-chan struct{} {
default:
}

// TODO: Consider some frequent check for the iteration too?
currentIteration := u.iteration.Load()
if iteration != currentIteration {
iteration = currentIteration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,6 @@ func (e *eventsWatcher) Ensure(tsInPast time.Time, timeout time.Duration) (int,
e.mu.Unlock()

// Start reading data
// TODO: use time.After to check for events from Watch?
// TODO: Consider exact time? We may miss some events though
started, _ := e.read(tsInPast.Truncate(time.Second).Add(-1), timeout)
result, _ := <-started
return result.count, result.err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ type ExecutionWatcher interface {
JobErr() error
PodErr() error

ReadJobEventsAt(ts time.Time, timeout time.Duration)
ReadPodEventsAt(ts time.Time, timeout time.Duration)
RefreshPod(timeout time.Duration)
RefreshJob(timeout time.Duration)

Expand Down Expand Up @@ -75,10 +73,7 @@ func (e *executionWatcher) State() ExecutionState {
func (e *executionWatcher) Commit() {
e.mu.Lock()
defer e.mu.Unlock()
// FIXME
uncommited := *e.uncommitted
uncommitedCopy := uncommited
e.state = common.Ptr(uncommitedCopy)
e.state = common.Ptr(*e.uncommitted)
if e.initialCommitInitialized.CompareAndSwap(false, true) {
close(e.initialCommitCh)
}
Expand All @@ -101,14 +96,6 @@ func (e *executionWatcher) PodErr() error {
return e.podWatcher.Err()
}

func (e *executionWatcher) ReadJobEventsAt(ts time.Time, timeout time.Duration) {
e.jobEventsWatcher.Ensure(ts, timeout)
}

func (e *executionWatcher) ReadPodEventsAt(ts time.Time, timeout time.Duration) {
e.podEventsWatcher.Ensure(ts, timeout)
}

func (e *executionWatcher) RefreshPod(timeout time.Duration) {
e.podWatcher.Update(timeout)
}
Expand Down Expand Up @@ -164,7 +151,6 @@ func NewExecutionWatcher(parentCtx context.Context, clientSet kubernetes.Interfa
initialCommitCh: make(chan struct{}),
}

//update := store.NewBatchUpdate(5 * time.Millisecond)
update := store.NewUpdate()
job := store.NewValue[batchv1.Job](ctx, update)
pod := store.NewValue[corev1.Pod](ctx, update)
Expand Down
96 changes: 0 additions & 96 deletions pkg/testworkflows/testworkflowcontroller/watchers/externalclock.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type jobWatcher struct {
listener func(job *batchv1.Job)
started atomic.Bool
watching atomic.Bool
startedCh chan struct{} // TODO: Ensure there is no memory leak
startedCh chan struct{}
ctx context.Context
cancel context.CancelCauseFunc
mu sync.Mutex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type podWatcher struct {
listener func(*corev1.Pod)
started atomic.Bool
watching atomic.Bool
startedCh chan struct{} // TODO: Ensure there is no memory leak
startedCh chan struct{}
ctx context.Context
cancel context.CancelCauseFunc
mu sync.Mutex
Expand Down

0 comments on commit 55065f5

Please sign in to comment.