Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Support concurrency for Actions #32751

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 76 additions & 102 deletions models/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"code.gitea.io/gitea/modules/util"
webhook_module "code.gitea.io/gitea/modules/webhook"

"github.com/nektos/act/pkg/jobparser"
"xorm.io/builder"
)

Expand All @@ -47,6 +46,8 @@ type ActionRun struct {
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
Status Status `xorm:"index"`
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
ConcurrencyGroup string
ConcurrencyCancel bool
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Expand Down Expand Up @@ -168,7 +169,7 @@ func (run *ActionRun) IsSchedule() bool {
return run.ScheduleID > 0
}

func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
_, err := db.GetEngine(ctx).ID(repo.ID).
SetExpr("num_action_runs",
builder.Select("count(*)").From("action_run").
Expand Down Expand Up @@ -196,13 +197,20 @@ func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
// Find all runs in the specified repository, reference, and workflow with non-final status
runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{
opts := &FindRunOptions{
RepoID: repoID,
Ref: ref,
WorkflowID: workflowID,
TriggerEvent: event,
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
})
}
return CancelPreviousJobsWithOpts(ctx, opts)
}

// CancelPreviousJobs cancels all previous jobs with opts
func CancelPreviousJobsWithOpts(ctx context.Context, opts *FindRunOptions) error {
// Find all runs by opts
runs, total, err := db.FindAndCount[ActionRun](ctx, opts)
if err != nil {
return err
}
Expand All @@ -222,119 +230,50 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
return err
}

// Iterate over each job and attempt to cancel it.
for _, job := range jobs {
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
status := job.Status
if status.IsDone() {
continue
}

// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
if job.TaskID == 0 {
job.Status = StatusCancelled
job.Stopped = timeutil.TimeStampNow()

// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return err
}

// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
return fmt.Errorf("job has changed, try again")
}

// Continue with the next job.
continue
}

// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
return err
}
if err := CancelJobs(ctx, jobs); err != nil {
return err
}
}

// Return nil to indicate successful cancellation of all running and waiting jobs.
return nil
}

// InsertRun inserts a run
// The title will be cut off at 255 characters if it's longer than 255 characters.
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
ctx, committer, err := db.TxContext(ctx)
if err != nil {
return err
}
defer committer.Close()

index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
if err != nil {
return err
}
run.Index = index
run.Title, _ = util.SplitStringAtByteN(run.Title, 255)
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) error {
// Iterate over each job and attempt to cancel it.
for _, job := range jobs {
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
status := job.Status
if status.IsDone() {
continue
}

if err := db.Insert(ctx, run); err != nil {
return err
}
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
if job.TaskID == 0 {
job.Status = StatusCancelled
job.Stopped = timeutil.TimeStampNow()

if run.Repo == nil {
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
if err != nil {
return err
}
run.Repo = repo
}
// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return err
}

if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
return err
}
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
return fmt.Errorf("job has changed, try again")
}

runJobs := make([]*ActionRunJob, 0, len(jobs))
var hasWaiting bool
for _, v := range jobs {
id, job := v.Job()
needs := job.Needs()
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
return err
// Continue with the next job.
continue
}
payload, _ := v.Marshal()
status := StatusWaiting
if len(needs) > 0 || run.NeedApproval {
status = StatusBlocked
} else {
hasWaiting = true
}
job.Name, _ = util.SplitStringAtByteN(job.Name, 255)
runJobs = append(runJobs, &ActionRunJob{
RunID: run.ID,
RepoID: run.RepoID,
OwnerID: run.OwnerID,
CommitSHA: run.CommitSHA,
IsForkPullRequest: run.IsForkPullRequest,
Name: job.Name,
WorkflowPayload: payload,
JobID: id,
Needs: needs,
RunsOn: job.RunsOn(),
Status: status,
})
}
if err := db.Insert(ctx, runJobs); err != nil {
return err
}

// if there is a job in the waiting status, increase tasks version.
if hasWaiting {
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
return err
}
}

return committer.Commit()
return nil
}

func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) {
Expand Down Expand Up @@ -426,7 +365,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
}
run.Repo = repo
}
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
return err
}
}
Expand All @@ -435,3 +374,38 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
}

type ActionRunIndex db.ResourceIndex

func CancelConcurrentJobs(ctx context.Context, actionRunJob *ActionRunJob) error {
// cancel previous jobs in the same concurrency group
previousJobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
RepoID: actionRunJob.RepoID,
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
Statuses: []Status{
StatusRunning,
StatusWaiting,
StatusBlocked,
},
})
if err != nil {
return fmt.Errorf("find previous jobs: %w", err)
}

return CancelJobs(ctx, previousJobs)
}

func ShouldJobBeBlockedByConcurrentJobs(ctx context.Context, actionRunJob *ActionRunJob) (bool, error) {
if actionRunJob.ConcurrencyCancel {
return false, CancelConcurrentJobs(ctx, actionRunJob)
}

concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
RepoID: actionRunJob.RepoID,
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
Statuses: []Status{StatusRunning, StatusWaiting},
})
if err != nil {
return false, fmt.Errorf("count waiting jobs: %w", err)
}

return concurrentJobsNum > 0, nil
}
14 changes: 10 additions & 4 deletions models/actions/run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ type ActionRunJob struct {
RunsOn []string `xorm:"JSON TEXT"`
TaskID int64 // the latest task of the job
Status Status `xorm:"index"`
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated index"`

RawConcurrencyGroup string // raw concurrency.group
RawConcurrencyCancel string // raw concurrency.cancel-in-progress
ConcurrencyGroup string // evaluated concurrency.group
ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress

Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated index"`
}

func init() {
Expand Down
22 changes: 16 additions & 6 deletions models/actions/run_job_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,21 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err
return jobs.LoadRuns(ctx, withRepo)
}

func GetRunsByIDs(ctx context.Context, runIDs []int64) (RunList, error) {
runList := make(RunList, 0, len(runIDs))
err := db.GetEngine(ctx).In("id", runIDs).Find(&runList)
return runList, err
}

type FindRunJobOptions struct {
db.ListOptions
RunID int64
RepoID int64
OwnerID int64
CommitSHA string
Statuses []Status
UpdatedBefore timeutil.TimeStamp
RunID int64
RepoID int64
OwnerID int64
CommitSHA string
Statuses []Status
UpdatedBefore timeutil.TimeStamp
ConcurrencyGroup string
}

func (opts FindRunJobOptions) ToConds() builder.Cond {
Expand All @@ -76,5 +83,8 @@ func (opts FindRunJobOptions) ToConds() builder.Cond {
if opts.UpdatedBefore > 0 {
cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore})
}
if opts.ConcurrencyGroup != "" {
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
}
return cond
}
20 changes: 12 additions & 8 deletions models/actions/run_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@ func (runs RunList) LoadRepos(ctx context.Context) error {

type FindRunOptions struct {
db.ListOptions
RepoID int64
OwnerID int64
WorkflowID string
Ref string // the commit/tag/… that caused this workflow
TriggerUserID int64
TriggerEvent webhook_module.HookEventType
Approved bool // not util.OptionalBool, it works only when it's true
Status []Status
RepoID int64
OwnerID int64
WorkflowID string
Ref string // the commit/tag/… that caused this workflow
TriggerUserID int64
TriggerEvent webhook_module.HookEventType
Approved bool // not util.OptionalBool, it works only when it's true
Status []Status
ConcurrencyGroup string
}

func (opts FindRunOptions) ToConds() builder.Cond {
Expand Down Expand Up @@ -99,6 +100,9 @@ func (opts FindRunOptions) ToConds() builder.Cond {
if opts.TriggerEvent != "" {
cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent})
}
if len(opts.ConcurrencyGroup) > 0 {
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
}
return cond
}

Expand Down
1 change: 1 addition & 0 deletions models/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ func prepareMigrationTasks() []*migration {
newMigration(309, "Improve Notification table indices", v1_23.ImproveNotificationTableIndices),
newMigration(310, "Add Priority to ProtectedBranch", v1_23.AddPriorityToProtectedBranch),
newMigration(311, "Add TimeEstimate to Issue table", v1_23.AddTimeEstimateColumnToIssueTable),
// TODO: AddActionsConcurrency
}
return preparedMigrations
}
Expand Down
28 changes: 28 additions & 0 deletions models/migrations/v1_23/v312.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package v1_23 //nolint

import (
"xorm.io/xorm"
)

func AddActionsConcurrency(x *xorm.Engine) error {
type ActionRun struct {
ConcurrencyGroup string
ConcurrencyCancel bool
}

if err := x.Sync(new(ActionRun)); err != nil {
return err
}

type ActionRunJob struct {
RawConcurrencyGroup string
RawConcurrencyCancel string
ConcurrencyGroup string
ConcurrencyCancel bool
}

return x.Sync(new(ActionRunJob))
}
Loading
Loading