Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

abci: Add relaxed local client synchronization models (backport #1141) #2

Merged
merged 2 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[proxy]` Expand `ClientCreator` interface to allow
for per-"connection" control of client creation
([\#1141](https://github.com/cometbft/cometbft/pull/1141))
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- `[abci/client]` Add consensus-synchronized local client creator,
which only imposes a mutex on the consensus "connection", leaving
the concurrency of all other "connections" up to the application
([\#1141](https://github.com/cometbft/cometbft/pull/1141))
3 changes: 3 additions & 0 deletions .changelog/v0.38.0/improvements/1141-abci-unsync-proxy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[abci/client]` Add fully unsynchronized local client creator, which
imposes no mutexes on the application, leaving all handling of concurrency up
to the application ([\#1141](https://github.com/cometbft/cometbft/pull/1141))
16 changes: 10 additions & 6 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ type localClient struct {

var _ Client = (*localClient)(nil)

// NewLocalClient creates a local client, which wraps the application interface that
// Tendermint as the client will call to the application as the server. The only
// difference, is that the local client has a global mutex which enforces serialization
// of all the ABCI calls from Tendermint to the Application.
// NewLocalClient creates a local client, which wraps the application interface
// that Comet as the client will call to the application as the server.
//
// Concurrency control in each client instance is enforced by way of a single
// mutex. If a mutex is not supplied (i.e. if mtx is nil), then one will be
// created.
func NewLocalClient(mtx *cmtsync.Mutex, app types.Application) Client {
if mtx == nil {
mtx = new(cmtsync.Mutex)
Expand Down Expand Up @@ -135,15 +137,17 @@ func (app *localClient) OfferSnapshot(ctx context.Context, req *types.RequestOff
}

func (app *localClient) LoadSnapshotChunk(ctx context.Context,
req *types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
req *types.RequestLoadSnapshotChunk,
) (*types.ResponseLoadSnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

return app.Application.LoadSnapshotChunk(ctx, req)
}

func (app *localClient) ApplySnapshotChunk(ctx context.Context,
req *types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
req *types.RequestApplySnapshotChunk,
) (*types.ResponseApplySnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

Expand Down
136 changes: 136 additions & 0 deletions abci/client/unsync_local_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package abcicli

import (
"context"
"sync"

types "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/service"
)

type unsyncLocalClient struct {
service.BaseService

types.Application

mtx sync.Mutex
Callback
}

var _ Client = (*unsyncLocalClient)(nil)

// NewUnsyncLocalClient creates a local client, which wraps the application
// interface that Comet as the client will call to the application as the
// server.
//
// This differs from [NewLocalClient] in that it returns a client that only
// maintains a mutex over the callback used by CheckTxAsync and not over the
// application, leaving it up to the proxy to handle all concurrency. If the
// proxy does not impose any concurrency restrictions, it is then left up to
// the application to implement its own concurrency for the relevant group of
// calls.
func NewUnsyncLocalClient(app types.Application) Client {
cli := &unsyncLocalClient{
Application: app,
}
cli.BaseService = *service.NewBaseService(nil, "unsyncLocalClient", cli)
return cli
}

func (app *unsyncLocalClient) SetResponseCallback(cb Callback) {
app.mtx.Lock()
app.Callback = cb
app.mtx.Unlock()
}

func (app *unsyncLocalClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*ReqRes, error) {
res, err := app.Application.CheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToRequestCheckTx(req),
types.ToResponseCheckTx(res),
), nil
}

func (app *unsyncLocalClient) callback(req *types.Request, res *types.Response) *ReqRes {
app.Callback(req, res)
rr := newLocalReqRes(req, res)
rr.callbackInvoked = true
return rr
}

//-------------------------------------------------------

func (app *unsyncLocalClient) Error() error {
return nil
}

func (app *unsyncLocalClient) Flush(context.Context) error {
return nil
}

func (app *unsyncLocalClient) Echo(_ context.Context, msg string) (*types.ResponseEcho, error) {
return &types.ResponseEcho{Message: msg}, nil
}

func (app *unsyncLocalClient) Info(ctx context.Context, req *types.RequestInfo) (*types.ResponseInfo, error) {
return app.Application.Info(ctx, req)
}

func (app *unsyncLocalClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return app.Application.CheckTx(ctx, req)
}

func (app *unsyncLocalClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
return app.Application.Query(ctx, req)
}

func (app *unsyncLocalClient) Commit(ctx context.Context, req *types.RequestCommit) (*types.ResponseCommit, error) {
return app.Application.Commit(ctx, req)
}

func (app *unsyncLocalClient) InitChain(ctx context.Context, req *types.RequestInitChain) (*types.ResponseInitChain, error) {
return app.Application.InitChain(ctx, req)
}

func (app *unsyncLocalClient) ListSnapshots(ctx context.Context, req *types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
return app.Application.ListSnapshots(ctx, req)
}

func (app *unsyncLocalClient) OfferSnapshot(ctx context.Context, req *types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
return app.Application.OfferSnapshot(ctx, req)
}

func (app *unsyncLocalClient) LoadSnapshotChunk(ctx context.Context,
req *types.RequestLoadSnapshotChunk,
) (*types.ResponseLoadSnapshotChunk, error) {
return app.Application.LoadSnapshotChunk(ctx, req)
}

func (app *unsyncLocalClient) ApplySnapshotChunk(ctx context.Context,
req *types.RequestApplySnapshotChunk,
) (*types.ResponseApplySnapshotChunk, error) {
return app.Application.ApplySnapshotChunk(ctx, req)
}

func (app *unsyncLocalClient) PrepareProposal(ctx context.Context, req *types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
return app.Application.PrepareProposal(ctx, req)
}

func (app *unsyncLocalClient) ProcessProposal(ctx context.Context, req *types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
return app.Application.ProcessProposal(ctx, req)
}

func (app *unsyncLocalClient) ExtendVote(ctx context.Context, req *types.RequestExtendVote) (*types.ResponseExtendVote, error) {
return app.Application.ExtendVote(ctx, req)
}

func (app *unsyncLocalClient) VerifyVoteExtension(ctx context.Context, req *types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
return app.Application.VerifyVoteExtension(ctx, req)
}

func (app *unsyncLocalClient) FinalizeBlock(ctx context.Context, req *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
return app.Application.FinalizeBlock(ctx, req)
}
2 changes: 1 addition & 1 deletion consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func newMockProxyApp(finalizeBlockResponse *abci.ResponseFinalizeBlock) proxy.Ap
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{
finalizeBlockResponse: finalizeBlockResponse,
})
cli, _ := clientCreator.NewABCIClient()
cli, _ := clientCreator.NewABCIConsensusClient()
err := cli.Start()
if err != nil {
panic(err)
Expand Down
6 changes: 3 additions & 3 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func newMempoolWithApp(cc proxy.ClientCreator) (*CListMempool, cleanupFunc) {
}

func newMempoolWithAppAndConfig(cc proxy.ClientCreator, cfg *config.Config) (*CListMempool, cleanupFunc) {
appConnMem, _ := cc.NewABCIClient()
appConnMem, _ := cc.NewABCIMempoolClient()
appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
err := appConnMem.Start()
if err != nil {
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestSerialReap(t *testing.T) {
mp, cleanup := newMempoolWithApp(cc)
defer cleanup()

appConnCon, _ := cc.NewABCIClient()
appConnCon, _ := cc.NewABCIConsensusClient()
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
err := appConnCon.Start()
require.Nil(t, err)
Expand Down Expand Up @@ -629,7 +629,7 @@ func TestMempoolTxsBytes(t *testing.T) {
require.NoError(t, err)
assert.EqualValues(t, 10, mp.SizeBytes())

appConnCon, _ := cc.NewABCIClient()
appConnCon, _ := cc.NewABCIConsensusClient()
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
err = appConnCon.Start()
require.Nil(t, err)
Expand Down
4 changes: 2 additions & 2 deletions proxy/app_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestEcho(t *testing.T) {
})

// Start client
cli, err := clientCreator.NewABCIClient()
cli, err := clientCreator.NewABCIMempoolClient()
if err != nil {
t.Fatalf("Error creating ABCI client: %v", err.Error())
}
Expand Down Expand Up @@ -72,7 +72,7 @@ func BenchmarkEcho(b *testing.B) {
})

// Start client
cli, err := clientCreator.NewABCIClient()
cli, err := clientCreator.NewABCIMempoolClient()
if err != nil {
b.Fatalf("Error creating ABCI client: %v", err.Error())
}
Expand Down
Loading
Loading