Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PMM-12896 Add limit for actions/jobs executed on the same DB at the same time #2898

Merged
merged 35 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
972bb1c
Add limit for actions/jobs executed on the same DB at the same time
artemgavrilov Mar 13, 2024
5a5a6a7
PMM-12896 Fix
artemgavrilov Mar 13, 2024
4463f1a
PMM-12896 Fixes
artemgavrilov Mar 13, 2024
af550fb
PMM-12896 Fix
artemgavrilov Mar 13, 2024
06743cf
PMM-12896 Fix tests
artemgavrilov Mar 14, 2024
dba0886
PMM-12896 Improvements, fixes, comments, tests
artemgavrilov Mar 14, 2024
4211fad
PMM-12896 Make per DB capacity configurable
artemgavrilov Mar 14, 2024
d5eefa2
PMM-12896 Fix
artemgavrilov Mar 14, 2024
a77e999
PMM-12896 Improve tests
artemgavrilov Mar 14, 2024
e003f5c
PMM-12896 Linter fixes
artemgavrilov Mar 14, 2024
be6e0bf
PMM-12896 Refactoring
artemgavrilov Mar 15, 2024
fb3a35f
PMM-12896 Fix comment
artemgavrilov Mar 15, 2024
2971596
Merge remote-tracking branch 'origin/main' into PMM-12896-limit-conns…
artemgavrilov Mar 19, 2024
e59a244
PMM-12896 Fix DSN method for PT mysql summary action
artemgavrilov Mar 19, 2024
306b2f5
PMM-12896 Fix bug in local semaphores releasing
artemgavrilov Mar 19, 2024
74f18c4
PMM-12896 Refactoring
artemgavrilov Mar 19, 2024
55b7e05
PMM-12896 Refactoring
artemgavrilov Mar 19, 2024
5d35524
PMM-12896 Refactoring
artemgavrilov Mar 19, 2024
206ca7c
PMM-12896 Refactoring
artemgavrilov Mar 19, 2024
5229182
PMM-12896 Fix test
artemgavrilov Mar 19, 2024
477790b
PMM-12896 Fix tests
artemgavrilov Mar 19, 2024
fab890b
Merge remote-tracking branch 'origin/main' into PMM-12896-limit-conns…
artemgavrilov Mar 19, 2024
428e5f1
Revert "PMM-12896 Fix tests"
artemgavrilov Mar 19, 2024
9c3b9f7
PMM-12896 Fix tests
artemgavrilov Mar 19, 2024
9484c03
PMM-12896 Use timeout only for job/action exectuion, not for resource…
artemgavrilov Mar 20, 2024
1156da4
PMM-12896 Refactoring
artemgavrilov Mar 20, 2024
5ab80bd
Update agent/config/config.go
artemgavrilov Mar 21, 2024
a779937
Update agent/runner/actions/mongodb_explain_action.go
artemgavrilov Mar 21, 2024
e9fec6f
Update agent/runner/actions/postgresql_query_select_action.go
artemgavrilov Mar 21, 2024
fb5530f
Merge remote-tracking branch 'origin/main' into PMM-12896-limit-conns…
artemgavrilov Mar 21, 2024
d1c0a9e
PMM-12896 Refactoring
artemgavrilov Mar 21, 2024
be9f781
PMM-12896 Mute linter
artemgavrilov Mar 21, 2024
270c974
Merge branch 'main' into PMM-12896-limit-conns-num-from-jobs-runner
artemgavrilov Mar 25, 2024
154761d
Merge branch 'main' into PMM-12896-limit-conns-num-from-jobs-runner
yurkovychv May 20, 2024
256c8b1
Merge branch 'main' into PMM-12896-limit-conns-num-from-jobs-runner
artemgavrilov May 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions agent/runner/actions/mongodb_explain_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"path/filepath"
"strings"
"time"

"github.com/percona/percona-toolkit/src/go/mongolib/proto"
Expand All @@ -44,7 +43,7 @@ var errCannotExplain = fmt.Errorf("cannot explain this type of query")

// NewMongoDBExplainAction creates a MongoDB EXPLAIN query Action.
func NewMongoDBExplainAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_MongoDBExplainParams, tempDir string) (Action, error) {
dsn, err := templates.RenderDSN(params.Dsn, params.TextFiles, filepath.Join(tempDir, strings.ToLower(mongoDBExplainActionType), id))
dsn, err := templates.RenderDSN(params.Dsn, params.TextFiles, filepath.Join(tempDir, mongoDBExplainActionType, id))
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
3 changes: 1 addition & 2 deletions agent/runner/actions/mongodb_query_admincommand_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package actions
import (
"context"
"path/filepath"
"strings"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -49,7 +48,7 @@ func NewMongoDBQueryAdmincommandAction(
arg interface{},
tempDir string,
) (Action, error) {
dsn, err := templates.RenderDSN(dsn, files, filepath.Join(tempDir, strings.ToLower(mongoDBQueryAdminCommandActionType), id))
dsn, err := templates.RenderDSN(dsn, files, filepath.Join(tempDir, mongoDBQueryAdminCommandActionType, id))
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
4 changes: 2 additions & 2 deletions agent/runner/actions/mysql_explain_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestMySQLExplain(t *testing.T) {
OutputFormat: agentpb.MysqlExplainOutputFormat_MYSQL_EXPLAIN_OUTPUT_FORMAT_DEFAULT,
}
a, err := NewMySQLExplainAction("", time.Second, params)
assert.Regexp(t, `Query to EXPLAIN is empty`, err.Error())
assert.ErrorContains(t, err, `Query to EXPLAIN is empty`)
assert.Nil(t, a)
})

Expand Down Expand Up @@ -210,7 +210,7 @@ func TestMySQLExplain(t *testing.T) {
OutputFormat: agentpb.MysqlExplainOutputFormat_MYSQL_EXPLAIN_OUTPUT_FORMAT_DEFAULT,
}
a, err := NewMySQLExplainAction("", time.Second, params)
assert.Error(t, err, "EXPLAIN failed because the query was too long and trimmed. Set max-query-length to a larger value.")
assert.ErrorContains(t, err, "EXPLAIN failed because the query was too long and trimmed. Set max-query-length to a larger value.")
assert.Nil(t, a)
})

Expand Down
2 changes: 1 addition & 1 deletion agent/runner/actions/postgresql_query_select_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewPostgreSQLQuerySelectAction(id string, timeout time.Duration, params *ag
return nil, errors.New("query contains ';'")
}

dsn, err := templates.RenderDSN(params.Dsn, params.TlsFiles, filepath.Join(tempDir, strings.ToLower(postgreSQLQuerySelectActionType), id))
dsn, err := templates.RenderDSN(params.Dsn, params.TlsFiles, filepath.Join(tempDir, postgreSQLQuerySelectActionType, id))
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
3 changes: 1 addition & 2 deletions agent/runner/actions/postgresql_query_show_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"database/sql"
"path/filepath"
"strings"
"time"

"github.com/lib/pq"
Expand All @@ -39,7 +38,7 @@ type postgresqlQueryShowAction struct {

// NewPostgreSQLQueryShowAction creates PostgreSQL SHOW query Action.
func NewPostgreSQLQueryShowAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_PostgreSQLQueryShowParams, tempDir string) (Action, error) {
dsn, err := templates.RenderDSN(params.Dsn, params.TlsFiles, filepath.Join(tempDir, strings.ToLower(postgreSQLQueryShowActionType), id))
dsn, err := templates.RenderDSN(params.Dsn, params.TlsFiles, filepath.Join(tempDir, postgreSQLQueryShowActionType, id))
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewPostgreSQLShowCreateTableAction(
params *agentpb.StartActionRequest_PostgreSQLShowCreateTableParams,
tempDir string,
) (Action, error) {
dsn, err := templates.RenderDSN(params.Dsn, params.TlsFiles, filepath.Join(tempDir, strings.ToLower(postgreSQLShowCreateTableActionType), id))
dsn, err := templates.RenderDSN(params.Dsn, params.TlsFiles, filepath.Join(tempDir, postgreSQLShowCreateTableActionType, id))
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
2 changes: 1 addition & 1 deletion agent/runner/actions/postgresql_show_index_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type postgresqlShowIndexAction struct {
// NewPostgreSQLShowIndexAction creates PostgreSQL SHOW INDEX Action.
// This is an Action that can run `SHOW INDEX` command on PostgreSQL service with given DSN.
func NewPostgreSQLShowIndexAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_PostgreSQLShowIndexParams, tempDir string) (Action, error) {
dsn, err := templates.RenderDSN(params.Dsn, params.TlsFiles, filepath.Join(tempDir, strings.ToLower(postgreSQLShowIndexActionType), id))
dsn, err := templates.RenderDSN(params.Dsn, params.TlsFiles, filepath.Join(tempDir, postgreSQLShowIndexActionType, id))
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
4 changes: 4 additions & 0 deletions agent/runner/actions/pt_mysql_summary_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func (a *ptMySQLSummaryAction) Type() string {

// DSN returns a DSN for the Action.
func (a *ptMySQLSummaryAction) DSN() string {
if a.params.Socket != "" {
return a.params.Socket
}

return net.JoinHostPort(a.params.Host, strconv.FormatUint(uint64(a.params.Port), 10))
artemgavrilov marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
13 changes: 13 additions & 0 deletions agent/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (r *Runner) acquire(ctx context.Context, token string) error {
}

if err := r.gSem.Acquire(ctx, 1); err != nil {
r.releaseL(token)
return err
artemgavrilov marked this conversation as resolved.
Show resolved Hide resolved
}

Expand All @@ -137,6 +138,11 @@ func (r *Runner) acquire(ctx context.Context, token string) error {
func (r *Runner) release(token string) {
r.gSem.Release(1)

r.releaseL(token)
}

// releaseL releases local semaphore for given token.
func (r *Runner) releaseL(token string) {
artemgavrilov marked this conversation as resolved.
Show resolved Hide resolved
if token != "" {
r.lSemsM.Lock()

Expand All @@ -150,6 +156,13 @@ func (r *Runner) release(token string) {
}
}

// lSemsLen returns number of local semaphores in use.
func (r *Runner) lSemsLen() int {
r.lSemsM.Lock()
defer r.lSemsM.Unlock()
return len(r.lSems)
}

// Run starts jobs execution loop. It reads jobs from the channel and starts them in separate goroutines.
func (r *Runner) Run(ctx context.Context) {
for {
Expand Down
129 changes: 81 additions & 48 deletions agent/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,43 +149,43 @@ func TestPerDBInstanceLimit(t *testing.T) {
defer cancel()
go cr.Run(ctx)

j1db1 := testJob{id: "test-1", timeout: time.Second, dsn: "postgresql://db1"}
j2db1 := testJob{id: "test-2", timeout: time.Second, dsn: "postgresql://db1"}
j3db1 := testJob{id: "test-3", timeout: time.Second, dsn: "postgresql://db1"}
j1db2 := testJob{id: "test-4", timeout: time.Second, dsn: "postgresql://db2"}
j2db2 := testJob{id: "test-5", timeout: time.Second, dsn: "postgresql://db2"}
j3db2 := testJob{id: "test-6", timeout: time.Second, dsn: "postgresql://db2"}
db1j1 := testJob{id: "test-1", timeout: time.Second, dsn: "postgresql://db1"}
db1j2 := testJob{id: "test-2", timeout: time.Second, dsn: "postgresql://db1"}
db1j3 := testJob{id: "test-3", timeout: time.Second, dsn: "postgresql://db1"}
db2j1 := testJob{id: "test-4", timeout: time.Second, dsn: "postgresql://db2"}
db2j2 := testJob{id: "test-5", timeout: time.Second, dsn: "postgresql://db2"}
db2j3 := testJob{id: "test-6", timeout: time.Second, dsn: "postgresql://db2"}

require.NoError(t, cr.StartJob(j1db1))
require.NoError(t, cr.StartJob(j1db2))
require.NoError(t, cr.StartJob(db1j1))
require.NoError(t, cr.StartJob(db2j1))

// Let jobs to start
time.Sleep(200 * time.Millisecond)

require.NoError(t, cr.StartJob(j2db1))
require.NoError(t, cr.StartJob(j2db2))
require.NoError(t, cr.StartJob(j3db1))
require.NoError(t, cr.StartJob(j3db2))
require.NoError(t, cr.StartJob(db1j2))
require.NoError(t, cr.StartJob(db2j2))
require.NoError(t, cr.StartJob(db1j3))
require.NoError(t, cr.StartJob(db2j3))

// Let rest jobs to reach semaphores
time.Sleep(300 * time.Millisecond)

assert.True(t, cr.IsRunning(j1db1.ID()))
assert.True(t, cr.IsRunning(j1db2.ID()))
assert.False(t, cr.IsRunning(j2db1.ID()))
assert.False(t, cr.IsRunning(j2db2.ID()))
assert.False(t, cr.IsRunning(j3db1.ID()))
assert.False(t, cr.IsRunning(j3db2.ID()))
assert.True(t, cr.IsRunning(db1j1.ID()))
assert.True(t, cr.IsRunning(db2j1.ID()))
assert.False(t, cr.IsRunning(db1j2.ID()))
assert.False(t, cr.IsRunning(db2j2.ID()))
assert.False(t, cr.IsRunning(db1j3.ID()))
assert.False(t, cr.IsRunning(db2j3.ID()))

// Over time all jobs are terminated
time.Sleep(time.Second)

assert.False(t, cr.IsRunning(j1db1.ID()))
assert.False(t, cr.IsRunning(j1db2.ID()))
assert.False(t, cr.IsRunning(j2db1.ID()))
assert.False(t, cr.IsRunning(j2db2.ID()))
assert.False(t, cr.IsRunning(j3db1.ID()))
assert.False(t, cr.IsRunning(j3db2.ID()))
assert.False(t, cr.IsRunning(db1j1.ID()))
assert.False(t, cr.IsRunning(db2j1.ID()))
assert.False(t, cr.IsRunning(db1j2.ID()))
assert.False(t, cr.IsRunning(db2j2.ID()))
assert.False(t, cr.IsRunning(db1j3.ID()))
assert.False(t, cr.IsRunning(db2j3.ID()))
}

func TestDefaultPerDBInstanceLimit(t *testing.T) {
Expand All @@ -196,43 +196,43 @@ func TestDefaultPerDBInstanceLimit(t *testing.T) {
defer cancel()
go cr.Run(ctx)

j1db1 := testJob{id: "test-1", timeout: time.Second, dsn: "postgresql://db1"}
j2db1 := testJob{id: "test-2", timeout: time.Second, dsn: "postgresql://db1"}
j3db1 := testJob{id: "test-3", timeout: time.Second, dsn: "postgresql://db1"}
j1db2 := testJob{id: "test-4", timeout: time.Second, dsn: "postgresql://db2"}
j2db2 := testJob{id: "test-5", timeout: time.Second, dsn: "postgresql://db2"}
j3db2 := testJob{id: "test-6", timeout: time.Second, dsn: "postgresql://db2"}
db1j1 := testJob{id: "test-1", timeout: time.Second, dsn: "postgresql://db1"}
db1j2 := testJob{id: "test-2", timeout: time.Second, dsn: "postgresql://db1"}
db1j3 := testJob{id: "test-3", timeout: time.Second, dsn: "postgresql://db1"}
db2j1 := testJob{id: "test-4", timeout: time.Second, dsn: "postgresql://db2"}
db2j2 := testJob{id: "test-5", timeout: time.Second, dsn: "postgresql://db2"}
db2j3 := testJob{id: "test-6", timeout: time.Second, dsn: "postgresql://db2"}

require.NoError(t, cr.StartJob(j1db1))
require.NoError(t, cr.StartJob(j1db2))
require.NoError(t, cr.StartJob(j2db1))
require.NoError(t, cr.StartJob(j2db2))
require.NoError(t, cr.StartJob(db1j1))
require.NoError(t, cr.StartJob(db2j1))
require.NoError(t, cr.StartJob(db1j2))
require.NoError(t, cr.StartJob(db2j2))

// Let jobs to start
time.Sleep(200 * time.Millisecond)

require.NoError(t, cr.StartJob(j3db1))
require.NoError(t, cr.StartJob(j3db2))
require.NoError(t, cr.StartJob(db1j3))
require.NoError(t, cr.StartJob(db2j3))

// Let rest jobs to reach semaphores
time.Sleep(300 * time.Millisecond)

assert.True(t, cr.IsRunning(j1db1.ID()))
assert.True(t, cr.IsRunning(j1db2.ID()))
assert.True(t, cr.IsRunning(j2db1.ID()))
assert.True(t, cr.IsRunning(j2db2.ID()))
assert.False(t, cr.IsRunning(j3db1.ID()))
assert.False(t, cr.IsRunning(j3db2.ID()))
assert.True(t, cr.IsRunning(db1j1.ID()))
assert.True(t, cr.IsRunning(db2j1.ID()))
assert.True(t, cr.IsRunning(db1j2.ID()))
assert.True(t, cr.IsRunning(db2j2.ID()))
assert.False(t, cr.IsRunning(db1j3.ID()))
assert.False(t, cr.IsRunning(db2j3.ID()))

// Over time all jobs are terminated
time.Sleep(time.Second)

assert.False(t, cr.IsRunning(j1db1.ID()))
assert.False(t, cr.IsRunning(j1db2.ID()))
assert.False(t, cr.IsRunning(j2db1.ID()))
assert.False(t, cr.IsRunning(j2db2.ID()))
assert.False(t, cr.IsRunning(j3db1.ID()))
assert.False(t, cr.IsRunning(j3db2.ID()))
assert.False(t, cr.IsRunning(db1j1.ID()))
assert.False(t, cr.IsRunning(db2j1.ID()))
assert.False(t, cr.IsRunning(db1j2.ID()))
assert.False(t, cr.IsRunning(db2j2.ID()))
assert.False(t, cr.IsRunning(db1j3.ID()))
assert.False(t, cr.IsRunning(db2j3.ID()))
}

func TestConcurrentRunnerTimeout(t *testing.T) {
Expand Down Expand Up @@ -329,6 +329,39 @@ func TestConcurrentRunnerCancel(t *testing.T) {
assert.Empty(t, cr.cancels)
}

func TestSemaphoresReleasing(t *testing.T) {
t.Parallel()
cr := New(1, 1)
err := cr.gSem.Acquire(context.TODO(), 1) // Acquire global semaphore to block all jobs
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go cr.Run(ctx)

j := testJob{id: "test-1", timeout: time.Second, dsn: "test"}

require.NoError(t, cr.StartJob(j))

// Let job to start
time.Sleep(200 * time.Millisecond)

// Check that job is started and local semaphore was acquired
assert.Equal(t, cr.lSemsLen(), 1)

// Check that job is not running, because it's waiting for global semaphore to be acquired
assert.False(t, cr.IsRunning(j.ID()))

// Cancel context to stop job
cancel()

// Let job to start and release resources
time.Sleep(200 * time.Millisecond)

// Check that local samaphore was released
assert.Equal(t, cr.lSemsLen(), 0)
}

type testJob struct {
id string
timeout time.Duration
Expand Down
Loading