Skip to content

Commit

Permalink
feat(wft): refact cron trigger for workflow trigger (#816)
Browse files Browse the repository at this point in the history
* feat(wft): refact cron trigger for workflow trigger

* fix error
  • Loading branch information
supereagle authored and caicloud-bot committed Mar 10, 2019
1 parent 1477113 commit be15a2a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 50 deletions.
23 changes: 15 additions & 8 deletions pkg/apis/cyclone/v1alpha1/workflow_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,32 @@ type WorkflowTrigger struct {
type TriggerType string

const (
// ScheduledTrigger indicates scheduled trigger
ScheduledTrigger = "Schedule"
// WebhookTrigger indicates webhook trigger
WebhookTrigger = "Webhook"
// TriggerTypeCron indicates cron trigger
TriggerTypeCron TriggerType = "Cron"

// TriggerTypeWebhook indicates webhook trigger
TriggerTypeWebhook TriggerType = "Webhook"
)

// WorkflowTriggerSpec defines workflow trigger definition.
type WorkflowTriggerSpec struct {
// Type of this trigger, Schedule or Webhook
Type TriggerType `json:"triggerType"`
// Parameters of the trigger, for Schedule type trigger, "schedule"
// parameter is required
// Type of this trigger, Cron or Webhook
Type TriggerType `json:"type"`
// Parameters of the trigger to run workflow
Parameters []ParameterItem `json:"parameters"`
// CronTrigger represents cron trigger config.
Cron CronTrigger `json:"cron,omitempty"`
// Whether this trigger is disabled, if set to true, no workflow will be triggered
Disabled bool `json:"disabled"`
// Spec to run the workflow
WorkflowRunSpec `json:",inline"`
}

// CronTrigger represents the cron trigger policy.
type CronTrigger struct {
Schedule string `json:"schedule"`
}

// WorkflowTriggerStatus describes status of a workflow trigger
type WorkflowTriggerStatus struct {
// How many times this trigger got triggered
Expand Down
17 changes: 17 additions & 0 deletions pkg/apis/cyclone/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 15 additions & 42 deletions pkg/workflow/controller/handlers/workflowtrigger/cron_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,21 @@ package workflowtrigger

import (
"fmt"
"strconv"
"sync"
"time"

"github.com/caicloud/cyclone/pkg/apis/cyclone/v1alpha1"
"github.com/caicloud/cyclone/pkg/k8s/clientset"
"github.com/caicloud/cyclone/pkg/workflow/common"
"github.com/robfig/cron"
log "github.com/sirupsen/logrus"
errors2 "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/rand"

"github.com/caicloud/cyclone/pkg/apis/cyclone/v1alpha1"
"github.com/caicloud/cyclone/pkg/k8s/clientset"
"github.com/caicloud/cyclone/pkg/workflow/common"
)

const (
// KeyTemplate ...
KeyTemplate = "%s/%s"

// ParamTimeZoneOffset is time zone offset to UTC, in minutes
// -480 for Asia/Shanghai(+8)
ParamTimeZoneOffset = "timeZoneOffset"
// ParamSchedule ...
ParamSchedule = "schedule"
)

// CronTrigger ...
Expand All @@ -38,14 +31,14 @@ type CronTrigger struct {
Manage *CronTriggerManager
}

// CronTriggerManager ...
// CronTriggerManager represents manager for cron triggers.
type CronTriggerManager struct {
Client clientset.Interface
CronTriggerMap map[string]*CronTrigger
mutex sync.Mutex
}

// NewTriggerManager ...
// NewTriggerManager returns a cron trigger manager.
func NewTriggerManager(client clientset.Interface) *CronTriggerManager {
return &CronTriggerManager{
Client: client,
Expand All @@ -54,7 +47,7 @@ func NewTriggerManager(client clientset.Interface) *CronTriggerManager {
}
}

// AddTrigger ...
// AddTrigger adds one cron trigger.
func (m *CronTriggerManager) AddTrigger(trigger *CronTrigger) {
wftKey := trigger.getKeyFromTrigger()
m.mutex.Lock()
Expand All @@ -67,7 +60,7 @@ func (m *CronTriggerManager) AddTrigger(trigger *CronTrigger) {
}
}

// DeleteTrigger ...
// DeleteTrigger deletes one cron trigger.
func (m *CronTriggerManager) DeleteTrigger(wftKey string) {
m.mutex.Lock()
defer m.mutex.Unlock()
Expand All @@ -80,7 +73,7 @@ func (m *CronTriggerManager) DeleteTrigger(wftKey string) {
}
}

// ToWorkflowTrigger ...
// ToWorkflowTrigger converts to workflow trigger.
func ToWorkflowTrigger(obj interface{}) (*v1alpha1.WorkflowTrigger, error) {

wft, ok := obj.(*v1alpha1.WorkflowTrigger)
Expand All @@ -95,9 +88,8 @@ func (t *CronTrigger) getKeyFromTrigger() string {
return fmt.Sprintf(KeyTemplate, t.Namespace, t.WorkflowTriggerName)
}

// Run ...
// Run triggers the workflows.
func (t *CronTrigger) Run() {

if t.WorkflowRun.Labels == nil {
t.WorkflowRun.Labels = make(map[string]string)
}
Expand Down Expand Up @@ -130,26 +122,8 @@ func getParamValue(items []v1alpha1.ParameterItem, key string) (string, bool) {
return "", false
}

// CreateCron ...
// CreateCron creates a cron trigger from workflow trigger, and add it to cron trigger manager.
func (m *CronTriggerManager) CreateCron(wft *v1alpha1.WorkflowTrigger) {

schedule, has := getParamValue(wft.Spec.Parameters, ParamSchedule)
if !has {
log.WithField("wft", wft.Name).Warn("Parameter 'schedule' not set in WorkflowTrigger spec")
return
}

timezone, has := getParamValue(wft.Spec.Parameters, ParamTimeZoneOffset)
if !has {
timezone = "0"
}

minuteOffUTC, err := strconv.Atoi(timezone)
if err != nil {
log.Warnf("can not parse timezone(%s) to int", timezone)
minuteOffUTC = 0
}

ct := &CronTrigger{
Namespace: wft.Namespace,
WorkflowTriggerName: wft.Name,
Expand All @@ -161,9 +135,8 @@ func (m *CronTriggerManager) CreateCron(wft *v1alpha1.WorkflowTrigger) {

ct.WorkflowRun = wfr

c := cron.NewWithLocation(time.FixedZone("userZone", -1*60*minuteOffUTC))
err = c.AddJob(schedule, ct)
if err != nil {
c := cron.New()
if err := c.AddJob(wft.Spec.Cron.Schedule, ct); err != nil {
log.Errorf("can not create Cron job: %s", err)
return
}
Expand All @@ -178,13 +151,13 @@ func (m *CronTriggerManager) CreateCron(wft *v1alpha1.WorkflowTrigger) {
}
}

// UpdateCron ...
// UpdateCron updates cron trigger based on workflow trigger.
func (m *CronTriggerManager) UpdateCron(wft *v1alpha1.WorkflowTrigger) {
m.DeleteCron(wft)
m.CreateCron(wft)
}

// DeleteCron ...
// DeleteCron deletes cron trigger from cron trigger manager.
func (m *CronTriggerManager) DeleteCron(wft *v1alpha1.WorkflowTrigger) {
wftKey := getKeyFromWorkflowTrigger(wft)
m.DeleteTrigger(wftKey)
Expand Down

0 comments on commit be15a2a

Please sign in to comment.