Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[no-release-notes] go: sqle: dprocedures: dolt_gc: Implement a session-aware GC safepoint controller. #8798

Merged
merged 14 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ SysbenchDockerfile.dockerignore
sysbench-runner-tests-entrypoint.sh
config.json
integration-tests/bats/batsee_results

*~
.dir-locals.el
10 changes: 7 additions & 3 deletions go/cmd/dolt/commands/engine/sqlengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
dsqle "github.com/dolthub/dolt/go/libraries/doltcore/sqle"
dblr "github.com/dolthub/dolt/go/libraries/doltcore/sqle/binlogreplication"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/cluster"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dprocedures"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/kvexec"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/mysql_file_handler"
Expand Down Expand Up @@ -124,12 +125,15 @@ func NewSqlEngine(
locations = append(locations, nil)
}

gcSafepointController := dsess.NewGCSafepointController()

b := env.GetDefaultInitBranch(mrEnv.Config())
pro, err := dsqle.NewDoltDatabaseProviderWithDatabases(b, mrEnv.FileSystem(), all, locations)
if err != nil {
return nil, err
}
pro = pro.WithRemoteDialer(mrEnv.RemoteDialProvider())
pro.RegisterProcedure(dprocedures.NewDoltGCProcedure(gcSafepointController))

config.ClusterController.RegisterStoredProcedures(pro)
if config.ClusterController != nil {
Expand Down Expand Up @@ -189,7 +193,7 @@ func NewSqlEngine(
engine.Analyzer.Catalog.StatsProvider = statsPro

engine.Analyzer.ExecBuilder = rowexec.NewOverrideBuilder(kvexec.Builder{})
sessFactory := doltSessionFactory(pro, statsPro, mrEnv.Config(), bcController, config.Autocommit)
sessFactory := doltSessionFactory(pro, statsPro, mrEnv.Config(), bcController, gcSafepointController, config.Autocommit)
sqlEngine.provider = pro
sqlEngine.contextFactory = sqlContextFactory()
sqlEngine.dsessFactory = sessFactory
Expand Down Expand Up @@ -413,9 +417,9 @@ func sqlContextFactory() contextFactory {
}

// doltSessionFactory returns a sessionFactory that creates a new DoltSession
func doltSessionFactory(pro *dsqle.DoltDatabaseProvider, statsPro sql.StatsProvider, config config.ReadWriteConfig, bc *branch_control.Controller, autocommit bool) sessionFactory {
func doltSessionFactory(pro *dsqle.DoltDatabaseProvider, statsPro sql.StatsProvider, config config.ReadWriteConfig, bc *branch_control.Controller, gcSafepointController *dsess.GCSafepointController, autocommit bool) sessionFactory {
return func(mysqlSess *sql.BaseSession, provider sql.DatabaseProvider) (*dsess.DoltSession, error) {
doltSession, err := dsess.NewDoltSession(mysqlSess, pro, config, bc, statsPro, writer.NewWriteSession)
doltSession, err := dsess.NewDoltSession(mysqlSess, pro, config, bc, statsPro, writer.NewWriteSession, gcSafepointController)
if err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion go/cmd/dolt/commands/sqlserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,13 @@ func ConfigureServices(
AutoStartBinlogReplica := &svcs.AnonService{
InitF: func(ctx context.Context) error {
// If we're unable to restart replication, log an error, but don't prevent the server from starting up
if err := binlogreplication.DoltBinlogReplicaController.AutoStart(ctx); err != nil {
sqlCtx, err := sqlEngine.NewDefaultContext(ctx)
if err != nil {
logrus.Errorf("unable to restart replication, could not create session: %s", err.Error())
return nil
}
defer sql.SessionEnd(sqlCtx.Session)
if err := binlogreplication.DoltBinlogReplicaController.AutoStart(sqlCtx); err != nil {
logrus.Errorf("unable to restart replication: %s", err.Error())
}
return nil
Expand Down
4 changes: 4 additions & 0 deletions go/libraries/doltcore/dconfig/envvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,8 @@ const (
EnvDbNameReplace = "DOLT_DBNAME_REPLACE"
EnvDoltRootHost = "DOLT_ROOT_HOST"
EnvDoltRootPassword = "DOLT_ROOT_PASSWORD"

// If set, must be "kill_connections" or "session_aware"
// Will go away after session_aware is made default-and-only.
EnvGCSafepointControllerChoice = "DOLT_GC_SAFEPOINT_CONTROLLER_CHOICE"
)
39 changes: 39 additions & 0 deletions go/libraries/doltcore/doltdb/doltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,45 @@ func (ddb *DoltDB) writeRootValue(ctx context.Context, rv RootValue) (RootValue,
return rv, ref, nil
}

// Persists all relevant root values of the WorkingSet to the database and returns all hashes reachable
// from the working set. This is used in GC, for example, where all dependencies of the in-memory working
// set value need to be accounted for.
func (ddb *DoltDB) WorkingSetHashes(ctx context.Context, ws *WorkingSet) ([]hash.Hash, error) {
spec, err := ws.writeValues(ctx, ddb, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in this method seems like it would be more naturally contained by the WorkingSet type

if err != nil {
return nil, err
}
ret := make([]hash.Hash, 0)
ret = append(ret, spec.StagedRoot.TargetHash())
ret = append(ret, spec.WorkingRoot.TargetHash())
if spec.MergeState != nil {
fromCommit, err := spec.MergeState.FromCommit(ctx, ddb.vrw)
if err != nil {
return nil, err
}
h, err := fromCommit.NomsValue().Hash(ddb.db.Format())
if err != nil {
return nil, err
}
ret = append(ret, h)
h, err = spec.MergeState.PreMergeWorkingAddr(ctx, ddb.vrw)
ret = append(ret, h)
}
if spec.RebaseState != nil {
ret = append(ret, spec.RebaseState.PreRebaseWorkingAddr())
commit, err := spec.RebaseState.OntoCommit(ctx, ddb.vrw)
if err != nil {
return nil, err
}
h, err := commit.NomsValue().Hash(ddb.db.Format())
if err != nil {
return nil, err
}
ret = append(ret, h)
}
return ret, nil
}

// ReadRootValue reads the RootValue associated with the hash given and returns it. Returns an error if the value cannot
// be read, or if the hash given doesn't represent a dolt RootValue.
func (ddb *DoltDB) ReadRootValue(ctx context.Context, h hash.Hash) (RootValue, error) {
Expand Down
1 change: 0 additions & 1 deletion go/libraries/doltcore/doltdb/workingset.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,6 @@ func (ws *WorkingSet) writeValues(ctx context.Context, db *DoltDB, meta *datas.W
return nil, fmt.Errorf("StagedRoot and workingRoot must be set. This is a bug.")
}

var r RootValue
r, workingRoot, err := db.writeRootValue(ctx, ws.workingRoot)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ func persistReplicaRunningState(ctx *sql.Context, state replicaRunningState) err
// loadReplicationConfiguration loads the replication configuration for default channel ("") from
// the "mysql" database, |mysqlDb|.
func loadReplicationConfiguration(ctx *sql.Context, mysqlDb *mysql_db.MySQLDb) (*mysql_db.ReplicaSourceInfo, error) {
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
rd := mysqlDb.Reader()
defer rd.Close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co
var conn *mysql.Conn
var err error
for connectionAttempts := uint64(0); ; connectionAttempts++ {
sql.SessionCommandBegin(ctx.Session)
replicaSourceInfo, err := loadReplicationConfiguration(ctx, a.engine.Analyzer.Catalog.MySQLDb)

sql.SessionCommandEnd(ctx.Session)
if replicaSourceInfo == nil {
err = ErrServerNotConfiguredAsReplica
DoltBinlogReplicaController.setIoError(ERFatalReplicaError, err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package binlogreplication

import (
"context"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -158,8 +157,6 @@ func (d *doltBinlogReplicaController) StartReplica(ctx *sql.Context) error {
// created and locked to disable log ins, and if it does exist, but is missing super privs or is not
// locked, it will be given superuser privs and locked.
func (d *doltBinlogReplicaController) configureReplicationUser(ctx *sql.Context) {
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
mySQLDb := d.engine.Analyzer.Catalog.MySQLDb
ed := mySQLDb.Editor()
defer ed.Close()
Expand Down Expand Up @@ -417,8 +414,10 @@ func (d *doltBinlogReplicaController) setSqlError(errno uint, message string) {
// replication is not configured, hasn't been started, or has been stopped before the server was
// shutdown, then this method will not start replication. This method should only be called during
// the server startup process and should not be invoked after that.
func (d *doltBinlogReplicaController) AutoStart(_ context.Context) error {
runningState, err := loadReplicationRunningState(d.ctx)
func (d *doltBinlogReplicaController) AutoStart(ctx *sql.Context) error {
sql.SessionCommandBegin(ctx.Session)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noted this on the other PR, I don't think we are generally very disciplined about sql session lifecycle management. Maybe it only matters in a few key places for GC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but we will have to get better where it matters...

defer sql.SessionCommandEnd(ctx.Session)
runningState, err := loadReplicationRunningState(ctx)
if err != nil {
logrus.Errorf("Unable to load replication running state: %s", err.Error())
return err
Expand All @@ -430,7 +429,7 @@ func (d *doltBinlogReplicaController) AutoStart(_ context.Context) error {
}

logrus.Info("auto-starting binlog replication from source...")
return d.StartReplica(d.ctx)
return d.StartReplica(ctx)
}

// Release all resources, such as replication threads, associated with the replication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,12 @@ func stopDoltSqlServer(t *testing.T) {
}

// startReplication configures the replication source on the replica and runs the START REPLICA statement.
func startReplication(_ *testing.T, port int) {
func startReplication(t *testing.T, port int) {
defer func() {
if r := recover(); r != nil {
t.Fatalf("failed to start replication and caught a panic: %v", r)
}
}()
replicaDatabase.MustExec(
fmt.Sprintf("change replication source to SOURCE_HOST='localhost', "+
"SOURCE_USER='replicator', SOURCE_PASSWORD='Zqr8_blrGm1!', "+
Expand Down
4 changes: 4 additions & 0 deletions go/libraries/doltcore/sqle/database_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (p *DoltDatabaseProvider) WithFunctions(fns []sql.Function) *DoltDatabasePr
return &cp
}

func (p *DoltDatabaseProvider) RegisterProcedure(procedure sql.ExternalStoredProcedureDetails) {
p.externalProcedures.Register(procedure)
}

// WithDbFactoryUrl returns a copy of this provider with the DbFactoryUrl set as provided.
// The URL is used when creating new databases.
// See doltdb.InMemDoltDB, doltdb.LocalDirDoltDB
Expand Down
Loading