Skip to content

Commit

Permalink
Merge pull request #225 from whywaita/fix/rescue-job-revenge
Browse files Browse the repository at this point in the history
redesign: Refactor rescue job (2)
  • Loading branch information
whywaita authored Jan 15, 2025
2 parents 226212c + e2bdced commit 56a2422
Show file tree
Hide file tree
Showing 17 changed files with 608 additions and 244 deletions.
8 changes: 8 additions & 0 deletions cmd/server/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"
"log"
"net/http"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -31,6 +33,12 @@ func init() {
}

func main() {
runtime.SetBlockProfileRate(1)
runtime.SetMutexProfileFraction(1)
go func() {
log.Fatal(http.ListenAndServe("localhost:6060", nil))
}()

myshoes, err := newShoes()
if err != nil {
log.Fatalln(err)
Expand Down
141 changes: 141 additions & 0 deletions pkg/datastore/github.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package datastore

import (
"context"
"fmt"
"net/url"
"sort"
"strings"
"sync"
"time"

"github.com/whywaita/myshoes/pkg/logger"

"github.com/google/go-github/v47/github"
"github.com/whywaita/myshoes/pkg/gh"
)

// NewClientInstallationByRepo create a client of GitHub using installation ID from repo name
func NewClientInstallationByRepo(ctx context.Context, ds Datastore, repo string) (*github.Client, *Target, error) {
target, err := SearchRepo(ctx, ds, repo)
if err != nil {
return nil, nil, fmt.Errorf("failed to search repository: %w", err)
}

installationID, err := gh.IsInstalledGitHubApp(ctx, target.Scope)
if err != nil {
return nil, nil, fmt.Errorf("failed to get installation ID: %w", err)
}

client, err := gh.NewClientInstallation(installationID)
if err != nil {
return nil, nil, fmt.Errorf("failed to create client: %w", err)
}

return client, target, nil
}

// PendingWorkflowRunWithTarget is struct for pending workflow run
type PendingWorkflowRunWithTarget struct {
Target *Target
WorkflowRun *github.WorkflowRun
}

// GetPendingWorkflowRunByRecentRepositories get pending workflow runs by recent active repositories
func GetPendingWorkflowRunByRecentRepositories(ctx context.Context, ds Datastore) ([]PendingWorkflowRunWithTarget, error) {
recentActiveRepositories, err := getRecentRepositories(ctx, ds)
if err != nil {
return nil, fmt.Errorf("failed to get recent repositories: %w", err)
}

var pendingRuns []PendingWorkflowRunWithTarget
var wg sync.WaitGroup
var mu sync.Mutex
for _, repoRawURL := range recentActiveRepositories {
wg.Add(1)
go func(repoRawURL string) {
defer wg.Done()
u, err := url.Parse(repoRawURL)
if err != nil {
logger.Logf(false, "failed to get pending run by recent repositories: failed to parse repository url: %+v", err)
return
}
fullName := strings.TrimPrefix(u.Path, "/")
client, target, err := NewClientInstallationByRepo(ctx, ds, fullName)
if err != nil {
logger.Logf(false, "failed to get pending run by recent repositories: failed to create a client of GitHub by repo (full_name: %s) %+v", fullName, err)
return
}

owner, repo := gh.DivideScope(fullName)
pendingRunsByRepo, err := getPendingRunByRepo(ctx, client, owner, repo)
if err != nil {
logger.Logf(false, "failed to get pending run by recent repositories: failed to get pending run by repo (full_name: %s) %+v", fullName, err)
return
}
mu.Lock()
for _, run := range pendingRunsByRepo {
pendingRuns = append(pendingRuns, PendingWorkflowRunWithTarget{
Target: target,
WorkflowRun: run,
})
}
mu.Unlock()
}(repoRawURL)
}

wg.Wait()

return pendingRuns, nil
}

func getPendingRunByRepo(ctx context.Context, client *github.Client, owner, repo string) ([]*github.WorkflowRun, error) {
runs, err := gh.ListWorkflowRunsNewest(ctx, client, owner, repo, 50)
if err != nil {
return nil, fmt.Errorf("failed to list runs: %w", err)
}

var pendingRuns []*github.WorkflowRun
for _, r := range runs {
if r.GetStatus() == "queued" || r.GetStatus() == "pending" {
oldMinutes := 10
sinceMinutes := time.Since(r.CreatedAt.Time).Minutes()
if sinceMinutes >= float64(oldMinutes) {
logger.Logf(false, "run %d is pending over %d minutes, So will enqueue (repo: %s/%s)", r.GetID(), oldMinutes, owner, repo)
pendingRuns = append(pendingRuns, r)
} else {
logger.Logf(true, "run %d is pending, but not over %d minutes. So ignore (since: %f minutes, repo: %s/%s)", r.GetID(), oldMinutes, sinceMinutes, owner, repo)
}
}
}

return pendingRuns, nil
}

func getRecentRepositories(ctx context.Context, ds Datastore) ([]string, error) {
recent := time.Now().Add(-1 * time.Hour)
recentRunners, err := ds.ListRunnersLogBySince(ctx, recent)
if err != nil {
return nil, fmt.Errorf("failed to get targets from datastore: %w", err)
}

// sort by created_at
sort.SliceStable(recentRunners, func(i, j int) bool {
return recentRunners[i].CreatedAt.After(recentRunners[j].CreatedAt)
})

// unique repositories
recentActiveRepositories := make(map[string]struct{})
for _, r := range recentRunners {
u := r.RepositoryURL
if _, ok := recentActiveRepositories[u]; !ok {
recentActiveRepositories[u] = struct{}{}
}
}
var result []string
for repository := range recentActiveRepositories {
result = append(result, repository)
}

return result, nil
}
3 changes: 2 additions & 1 deletion pkg/datastore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Datastore interface {
CreateRunner(ctx context.Context, runner Runner) error
ListRunners(ctx context.Context) ([]Runner, error)
ListRunnersByTargetID(ctx context.Context, targetID uuid.UUID) ([]Runner, error)
ListRunnersLogBySince(ctx context.Context, since time.Time) ([]Runner, error)
GetRunner(ctx context.Context, id uuid.UUID) (*Runner, error)
DeleteRunner(ctx context.Context, id uuid.UUID, deletedAt time.Time, reason RunnerStatus) error

Expand Down Expand Up @@ -130,7 +131,7 @@ func UpdateTargetStatus(ctx context.Context, ds Datastore, targetID uuid.UUID, n
func SearchRepo(ctx context.Context, ds Datastore, repo string) (*Target, error) {
sep := strings.Split(repo, "/")
if len(sep) != 2 {
return nil, fmt.Errorf("incorrect repo format ex: orgs/repo")
return nil, fmt.Errorf("incorrect repo format ex: orgs/repo (input: %s)", repo)
}

// use repo scope if set repo
Expand Down
15 changes: 15 additions & 0 deletions pkg/datastore/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,21 @@ func (m *Memory) ListRunnersByTargetID(ctx context.Context, targetID uuid.UUID)
return runners, nil
}

// ListRunnersLogBySince ListRunnerLog get a runners since time
func (m *Memory) ListRunnersLogBySince(ctx context.Context, since time.Time) ([]datastore.Runner, error) {
m.mu.Lock()
defer m.mu.Unlock()

var runners []datastore.Runner
for _, r := range m.runners {
if r.CreatedAt.After(since) {
runners = append(runners, r)
}
}

return runners, nil
}

// GetRunner get a runner
func (m *Memory) GetRunner(ctx context.Context, id uuid.UUID) (*datastore.Runner, error) {
m.mu.Lock()
Expand Down
17 changes: 17 additions & 0 deletions pkg/datastore/mysql/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,23 @@ func (m *MySQL) ListRunnersByTargetID(ctx context.Context, targetID uuid.UUID) (
return runners, nil
}

// ListRunnersLogBySince ListRunnerLog get a runners since time
func (m *MySQL) ListRunnersLogBySince(ctx context.Context, since time.Time) ([]datastore.Runner, error) {
var runners []datastore.Runner

query := `SELECT runner_id, shoes_type, ip_address, target_id, cloud_id, created_at, updated_at, resource_type, repository_url, request_webhook, runner_user, provider_url FROM runner_detail WHERE created_at > ?`
err := m.Conn.SelectContext(ctx, &runners, query, since)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, datastore.ErrNotFound
}

return nil, fmt.Errorf("failed to execute SELECT query: %w", err)
}

return runners, nil
}

// GetRunner get a runner
func (m *MySQL) GetRunner(ctx context.Context, id uuid.UUID) (*datastore.Runner, error) {
var r datastore.Runner
Expand Down
63 changes: 63 additions & 0 deletions pkg/datastore/mysql/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,70 @@ func TestMySQL_ListRunnersNotReturnDeleted(t *testing.T) {
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("mismatch (-want +got):\n%s", diff)
}
}

func TestMySQL_ListRunnersLogBySince(t *testing.T) {
testDatastore, teardown := testutils.GetTestDatastore()
defer teardown()

if err := testDatastore.CreateTarget(context.Background(), datastore.Target{
UUID: testTargetID,
Scope: testScopeRepo,
GitHubToken: testGitHubToken,
TokenExpiredAt: testTime,
ResourceType: datastore.ResourceTypeNano,
}); err != nil {
t.Fatalf("failed to create target: %+v", err)
}

u := "00000000-0000-0000-0000-00000000000%d"

for i := 1; i < 3; i++ {
input := datastore.Runner{
UUID: testRunnerID,
ShoesType: "shoes-test",
TargetID: testTargetID,
CloudID: "mycloud-uuid",
ResourceType: datastore.ResourceTypeNano,
RepositoryURL: "https://github.com/octocat/Hello-World",
RequestWebhook: "{}",
}
input.UUID = uuid.FromStringOrNil(fmt.Sprintf(u, i))
err := testDatastore.CreateRunner(context.Background(), input)
if err != nil {
t.Fatalf("failed to create runner: %+v", err)
}
time.Sleep(500 * time.Millisecond)
}

recent := time.Now().Add(-10 * time.Second)
got, err := testDatastore.ListRunnersLogBySince(context.Background(), recent)
if err != nil {
t.Fatalf("failed to get runners: %+v", err)
}
for i := range got {
got[i].CreatedAt = time.Time{}
got[i].UpdatedAt = time.Time{}
}

var want []datastore.Runner
for i := 1; i < 3; i++ {
r := datastore.Runner{
UUID: testRunnerID,
ShoesType: "shoes-test",
TargetID: testTargetID,
CloudID: "mycloud-uuid",
ResourceType: datastore.ResourceTypeNano,
RepositoryURL: "https://github.com/octocat/Hello-World",
RequestWebhook: "{}",
}
r.UUID = uuid.FromStringOrNil(fmt.Sprintf(u, i))
want = append(want, r)
}

if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("mismatch (-want +got):\n%s", diff)
}
}

func TestMySQL_GetRunner(t *testing.T) {
Expand Down
62 changes: 62 additions & 0 deletions pkg/gh/installation.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,65 @@ func _listInstallations(ctx context.Context) ([]*github.Installation, error) {
}
return installations, nil
}

func listAppsInstalledRepo(ctx context.Context, installationID int64) ([]*github.Repository, error) {
if cachedRs, found := responseCache.Get(getCacheInstalledRepoKey(installationID)); found {
return cachedRs.([]*github.Repository), nil
}

inst, err := _listAppsInstalledRepo(ctx, installationID)
if err != nil {
return nil, fmt.Errorf("failed to list installations: %w", err)
}

responseCache.Set(getCacheInstalledRepoKey(installationID), inst, 1*time.Hour)

return _listAppsInstalledRepo(ctx, installationID)
}

func getCacheInstalledRepoKey(installationID int64) string {
return fmt.Sprintf("installed-repo-%d", installationID)
}

func _listAppsInstalledRepo(ctx context.Context, installationID int64) ([]*github.Repository, error) {
clientInstallation, err := NewClientInstallation(installationID)
if err != nil {
return nil, fmt.Errorf("failed to create a client installation: %w", err)
}

var opts = &github.ListOptions{
Page: 0,
PerPage: 100,
}

var repositories []*github.Repository
for {
logger.Logf(true, "get list of repository from installation, page: %d, now all repositories: %d", opts.Page, len(repositories))
lr, resp, err := clientInstallation.Apps.ListRepos(ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to get installed repositories: %w", err)
}
repositories = append(repositories, lr.Repositories...)
if resp.NextPage == 0 {
break
}
opts.Page = resp.NextPage
}

return repositories, nil
}

// PurgeInstallationCache purges the cache of installations
func PurgeInstallationCache(ctx context.Context) error {
installations, err := listInstallations(ctx)
if err != nil {
return fmt.Errorf("failed to get installations: %w", err)
}

for _, installation := range installations {
responseCache.Delete(getCacheInstalledRepoKey(installation.GetID()))
}

responseCache.Delete(getCacheInstallationsKey())
return nil
}
Loading

0 comments on commit 56a2422

Please sign in to comment.