diff --git a/apis/flinkcluster/v1beta1/flinkcluster_types_util.go b/apis/flinkcluster/v1beta1/flinkcluster_types_util.go index b8dbb09e..5e5cb148 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_types_util.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_types_util.go @@ -60,7 +60,7 @@ func (j *JobStatus) IsSavepointUpToDate(spec *JobSpec, compareTime time.Time) bo } var stateMaxAge = int(*spec.MaxStateAgeToRestoreSeconds) - return !hasTimeElapsed(j.SavepointTime, compareTime, stateMaxAge) + return !util.HasTimeElapsed(j.SavepointTime, compareTime, stateMaxAge) } // ShouldRestart returns true if the controller should restart failed job. @@ -121,32 +121,6 @@ func (r *RevisionStatus) IsUpdateTriggered() bool { return r.CurrentRevision != r.NextRevision } -// TimeConverter converts between time.Time and string. -type TimeConverter struct{} - -// FromString converts string to time.Time. -func (tc *TimeConverter) FromString(timeStr string) time.Time { - timestamp, err := time.Parse( - time.RFC3339, timeStr) - if err != nil { - panic(fmt.Sprintf("Failed to parse time string: %s", timeStr)) - } - return timestamp -} - -// ToString converts time.Time to string. -func (tc *TimeConverter) ToString(timestamp time.Time) string { - return timestamp.Format(time.RFC3339) -} - -// Check time has passed -func hasTimeElapsed(timeToCheckStr string, now time.Time, intervalSec int) bool { - tc := &TimeConverter{} - timeToCheck := tc.FromString(timeToCheckStr) - intervalPassedTime := timeToCheck.Add(time.Duration(int64(intervalSec) * int64(time.Second))) - return now.After(intervalPassedTime) -} - func isBlank(s *string) bool { return s == nil || strings.TrimSpace(*s) == "" } diff --git a/apis/flinkcluster/v1beta1/flinkcluster_types_util_test.go b/apis/flinkcluster/v1beta1/flinkcluster_types_util_test.go index 30fa60bc..fceba3db 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_types_util_test.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_types_util_test.go @@ -20,12 +20,13 @@ import ( "testing" "time" + "github.com/spotify/flink-on-k8s-operator/internal/util" "gotest.tools/v3/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestIsSavepointUpToDate(t *testing.T) { - var tc = &TimeConverter{} + var tc = &util.TimeConverter{} var savepointTime = time.Now() var jobCompletionTime = savepointTime.Add(time.Second * 100) var maxStateAgeToRestoreSeconds = int32(300) @@ -92,7 +93,7 @@ func TestIsSavepointUpToDate(t *testing.T) { } func TestShouldRestartJob(t *testing.T) { - var tc = &TimeConverter{} + var tc = &util.TimeConverter{} var restartOnFailure = JobRestartPolicyFromSavepointOnFailure var neverRestart = JobRestartPolicyNever var maxStateAgeToRestoreSeconds = int32(300) // 5 min diff --git a/apis/flinkcluster/v1beta1/flinkcluster_validate_test.go b/apis/flinkcluster/v1beta1/flinkcluster_validate_test.go index aaeca418..02b01948 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_validate_test.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_validate_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/spotify/flink-on-k8s-operator/internal/util" "k8s.io/apimachinery/pkg/api/resource" "gotest.tools/v3/assert" @@ -371,7 +372,7 @@ func TestTaskManagerDeploymentTypeUpdate(t *testing.T) { func TestUpdateJob(t *testing.T) { var validator = &Validator{} - var tc = &TimeConverter{} + var tc = &util.TimeConverter{} var maxStateAge = time.Duration(MaxStateAgeToRestore) var jarFileNew = "gs://my-bucket/myjob-v2.jar" diff --git a/apis/flinkcluster/v1beta1/zz_generated.deepcopy.go b/apis/flinkcluster/v1beta1/zz_generated.deepcopy.go index e558fba3..40d11b89 100644 --- a/apis/flinkcluster/v1beta1/zz_generated.deepcopy.go +++ b/apis/flinkcluster/v1beta1/zz_generated.deepcopy.go @@ -1075,21 +1075,6 @@ func (in *TaskManagerStatus) DeepCopy() *TaskManagerStatus { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *TimeConverter) DeepCopyInto(out *TimeConverter) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TimeConverter. -func (in *TimeConverter) DeepCopy() *TimeConverter { - if in == nil { - return nil - } - out := new(TimeConverter) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Validator) DeepCopyInto(out *Validator) { *out = *in diff --git a/internal/util/time.go b/internal/util/time.go index 64e25a3b..1f8dd313 100644 --- a/internal/util/time.go +++ b/internal/util/time.go @@ -23,6 +23,14 @@ func (tc *TimeConverter) ToString(timestamp time.Time) string { return timestamp.Format(time.RFC3339) } +// Check time has passed +func HasTimeElapsed(timeToCheckStr string, now time.Time, intervalSec int) bool { + tc := &TimeConverter{} + timeToCheck := tc.FromString(timeToCheckStr) + intervalPassedTime := timeToCheck.Add(time.Duration(int64(intervalSec) * int64(time.Second))) + return now.After(intervalPassedTime) +} + // SetTimestamp sets the current timestamp to the target. func SetTimestamp(target *string) { var tc = &TimeConverter{}