Skip to content

Commit

Permalink
Merge pull request #888 from yyforyongyu/batch-client-mempool
Browse files Browse the repository at this point in the history
chain: use batch rpc client in mempool poller
  • Loading branch information
guggero authored Oct 12, 2023
2 parents fe40c37 + 4acd62b commit afbabbe
Show file tree
Hide file tree
Showing 22 changed files with 1,115 additions and 279 deletions.
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ clean:
@$(call print, "Cleaning source.$(NC)")
$(RM) coverage.txt

tidy-module:
echo "Running 'go mod tidy' for all modules"
scripts/tidy_modules.sh

tidy-module-check: tidy-module
if test -n "$$(git status --porcelain)"; then echo "modules not updated, please run `make tidy-module` again!"; git status; exit 1; fi

.PHONY: all \
default \
build \
Expand Down
7 changes: 6 additions & 1 deletion chain/bitcoind_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func NewBitcoindConn(cfg *BitcoindConfig) (*BitcoindConn, error) {
return nil, err
}

batchClient, err := rpcclient.NewBatch(clientCfg)
if err != nil {
return nil, err
}

// Verify that the node is running on the expected network.
net, err := getCurrentNet(client)
if err != nil {
Expand Down Expand Up @@ -182,7 +187,7 @@ func NewBitcoindConn(cfg *BitcoindConfig) (*BitcoindConn, error) {
quit: make(chan struct{}),
}

bc.events, err = NewBitcoindEventSubscriber(cfg, client)
bc.events, err = NewBitcoindEventSubscriber(cfg, client, batchClient)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions chain/bitcoind_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ type BitcoindEvents interface {
}

// Ensure rpcclient.Client implements the rpcClient interface at compile time.
var _ rpcClient = (*rpcclient.Client)(nil)
var _ batchClient = (*rpcclient.Client)(nil)

// NewBitcoindEventSubscriber initialises a new BitcoinEvents object impl
// depending on the config passed.
func NewBitcoindEventSubscriber(cfg *BitcoindConfig,
client *rpcclient.Client) (BitcoindEvents, error) {
func NewBitcoindEventSubscriber(cfg *BitcoindConfig, client *rpcclient.Client,
bClient batchClient) (BitcoindEvents, error) {

if cfg.PollingConfig != nil && cfg.ZMQConfig != nil {
return nil, fmt.Errorf("either PollingConfig or ZMQConfig " +
Expand All @@ -52,7 +52,7 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig,
}

pollingEvents := newBitcoindRPCPollingEvents(
cfg.PollingConfig, client,
cfg.PollingConfig, client, bClient,
)

return pollingEvents, nil
Expand All @@ -71,7 +71,7 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig,
return nil, err
}

return newBitcoindZMQEvents(cfg.ZMQConfig, client, hasRPC)
return newBitcoindZMQEvents(cfg.ZMQConfig, client, bClient, hasRPC)
}

// hasSpendingPrevoutRPC returns whether or not the bitcoind has the newer
Expand Down
45 changes: 32 additions & 13 deletions chain/bitcoind_rpc_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ type PollingConfig struct {
// jitter by scaling TxPollingInterval with it. This value must be no
// less than 0. Default to 0, meaning no jitter will be applied.
TxPollingIntervalJitter float64

// RPCBatchSize defines the number of RPC requests to be batches before
// sending them to the bitcoind node.
RPCBatchSize uint32

// RPCBatchInterval defines the time to wait before attempting the next
// batch when the current one finishes.
RPCBatchInterval time.Duration
}

// bitcoindRPCPollingEvents delivers block and transaction notifications that
Expand Down Expand Up @@ -68,8 +76,8 @@ var _ BitcoindEvents = (*bitcoindRPCPollingEvents)(nil)

// newBitcoindRPCPollingEvents instantiates a new bitcoindRPCPollingEvents
// object.
func newBitcoindRPCPollingEvents(cfg *PollingConfig,
client *rpcclient.Client) *bitcoindRPCPollingEvents {
func newBitcoindRPCPollingEvents(cfg *PollingConfig, client *rpcclient.Client,
bClient batchClient) *bitcoindRPCPollingEvents {

if cfg.BlockPollingInterval == 0 {
cfg.BlockPollingInterval = defaultBlockPollInterval
Expand All @@ -86,12 +94,28 @@ func newBitcoindRPCPollingEvents(cfg *PollingConfig,
cfg.TxPollingIntervalJitter = 0
}

// Create the config for mempool and attach default values if not
// configed.
mCfg := &mempoolConfig{
client: bClient,
getRawTxBatchSize: cfg.RPCBatchSize,
batchWaitInterval: cfg.RPCBatchInterval,
}

if cfg.RPCBatchSize == 0 {
mCfg.getRawTxBatchSize = DefaultGetRawTxBatchSize
}

if cfg.RPCBatchInterval == 0 {
mCfg.batchWaitInterval = DefaultBatchWaitInterval
}

return &bitcoindRPCPollingEvents{
cfg: cfg,
client: client,
txNtfns: make(chan *wire.MsgTx),
blockNtfns: make(chan *wire.MsgBlock),
mempool: newMempool(client),
mempool: newMempool(mCfg),
quit: make(chan struct{}),
}
}
Expand All @@ -116,6 +140,8 @@ func (b *bitcoindRPCPollingEvents) Start() error {

// Stop cleans up all the bitcoindRPCPollingEvents resources and goroutines.
func (b *bitcoindRPCPollingEvents) Stop() error {
b.mempool.Shutdown()

close(b.quit)
b.wg.Wait()
return nil
Expand Down Expand Up @@ -247,16 +273,9 @@ func (b *bitcoindRPCPollingEvents) txEventHandlerRPC() {
now := time.Now()

// After each ticker interval, we poll the mempool to
// check for transactions we haven't seen yet.
txs, err := b.client.GetRawMempool()
if err != nil {
log.Errorf("Unable to retrieve mempool txs: "+
"%v", err)
continue
}

// Update our local mempool with the new mempool.
newTxs := b.mempool.UpdateMempoolTxes(txs)
// check for transactions we haven't seen yet and
// update our local mempool with the new mempool.
newTxs := b.mempool.UpdateMempoolTxes()

log.Tracef("Reconciled mempool spends in %v",
time.Since(now))
Expand Down
46 changes: 33 additions & 13 deletions chain/bitcoind_zmq_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ type ZMQConfig struct {
//
// TODO(yy): replace this temp config with SEQUENCE check.
PollingIntervalJitter float64

// RPCBatchSize defines the number of RPC requests to be batches before
// sending them to the bitcoind node.
RPCBatchSize uint32

// RPCBatchInterval defines the time to wait before attempting the next
// batch when the current one finishes.
RPCBatchInterval time.Duration
}

// bitcoindZMQEvents delivers block and transaction notifications that it gets
Expand Down Expand Up @@ -91,8 +99,8 @@ var _ BitcoindEvents = (*bitcoindZMQEvents)(nil)
// newBitcoindZMQEvents initialises the necessary zmq connections to bitcoind.
// If bitcoind is on a version with the gettxspendingprevout RPC, we can omit
// the mempool.
func newBitcoindZMQEvents(cfg *ZMQConfig,
client *rpcclient.Client, hasRPC bool) (*bitcoindZMQEvents, error) {
func newBitcoindZMQEvents(cfg *ZMQConfig, client *rpcclient.Client,
bClient batchClient, hasRPC bool) (*bitcoindZMQEvents, error) {

// Check polling config.
if cfg.MempoolPollingInterval == 0 {
Expand Down Expand Up @@ -133,6 +141,22 @@ func newBitcoindZMQEvents(cfg *ZMQConfig,
"events: %v", err)
}

// Create the config for mempool and attach default values if not
// configed.
mCfg := &mempoolConfig{
client: bClient,
getRawTxBatchSize: cfg.RPCBatchSize,
batchWaitInterval: cfg.RPCBatchInterval,
}

if cfg.RPCBatchSize == 0 {
mCfg.getRawTxBatchSize = DefaultGetRawTxBatchSize
}

if cfg.RPCBatchInterval == 0 {
mCfg.batchWaitInterval = DefaultBatchWaitInterval
}

zmqEvents := &bitcoindZMQEvents{
cfg: cfg,
client: client,
Expand All @@ -141,11 +165,12 @@ func newBitcoindZMQEvents(cfg *ZMQConfig,
hasPrevoutRPC: hasRPC,
blockNtfns: make(chan *wire.MsgBlock),
txNtfns: make(chan *wire.MsgTx),
mempool: newMempool(client),
mempool: newMempool(mCfg),
quit: make(chan struct{}),
}

return zmqEvents, nil

}

// Start spins off the bitcoindZMQEvent goroutines.
Expand All @@ -168,6 +193,8 @@ func (b *bitcoindZMQEvents) Start() error {

// Stop cleans up any of the resources and goroutines held by bitcoindZMQEvents.
func (b *bitcoindZMQEvents) Stop() error {
b.mempool.Shutdown()

var returnErr error
if err := b.txConn.Close(); err != nil {
returnErr = err
Expand Down Expand Up @@ -502,16 +529,9 @@ func (b *bitcoindZMQEvents) mempoolPoller() {
now := time.Now()

// After each ticker interval, we poll the mempool to
// check for transactions we haven't seen yet.
txs, err := b.mempool.client.GetRawMempool()
if err != nil {
log.Errorf("Unable to retrieve mempool txs: "+
"%v", err)
continue
}

// Update our local mempool with the new mempool.
b.mempool.UpdateMempoolTxes(txs)
// check for transactions we haven't seen yet and
// update our local mempool with the new mempool.
b.mempool.UpdateMempoolTxes()

log.Tracef("Reconciled mempool spends in %v",
time.Since(now))
Expand Down
36 changes: 32 additions & 4 deletions chain/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/wtxmgr"
Expand Down Expand Up @@ -124,8 +125,35 @@ type (
}
)

// rpcClient defines an interface that is used to interact with the RPC client.
type rpcClient interface {
GetRawMempool() ([]*chainhash.Hash, error)
GetRawTransaction(txHash *chainhash.Hash) (*btcutil.Tx, error)
// batchClient defines an interface that is used to interact with the RPC
// client.
//
// NOTE: the client returned from `rpcclient.NewBatch` will implement this
// interface. Unlike the client from `rpcclient.New`, calling `GetRawMempool`
// on this client will block and won't return.
//
// TODO(yy): create a new type BatchClient in `rpcclient`.
type batchClient interface {
// GetRawMempoolAsync returns an instance of a type that can be used to
// get the result of the RPC at some future time by invoking the
// Receive function on the returned instance.
GetRawMempoolAsync() rpcclient.FutureGetRawMempoolResult

// GetRawTransactionAsync returns an instance of a type that can be
// used to get the result of the RPC at some future time by invoking
// the Receive function on the returned instance.
GetRawTransactionAsync(
txHash *chainhash.Hash) rpcclient.FutureGetRawTransactionResult

// Send marshalls bulk requests and sends to the server creates a
// response channel to receive the response
Send() error
}

// getRawTxReceiver defines an interface that's used to receive response from
// `GetRawTransactionAsync`.
type getRawTxReceiver interface {
// Receive waits for the Response promised by the future and returns a
// transaction given its hash.
Receive() (*btcutil.Tx, error)
}
Loading

0 comments on commit afbabbe

Please sign in to comment.