Skip to content

Commit

Permalink
Merge pull request #129 from hyperledger/fix-127
Browse files Browse the repository at this point in the history
Do not keep dispatching after block listener falls behind
  • Loading branch information
matthew1001 authored Sep 3, 2024
2 parents 293e998 + 9ec77ad commit 97dc2d9
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 25 deletions.
8 changes: 4 additions & 4 deletions internal/confirmations/confirmed_block_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,12 @@ func (cbl *confirmedBlockListener) processBlockNotification(block *apitypes.Bloc
var dispatchHead *apitypes.BlockInfo
if len(cbl.newHeadToAdd) > 0 {
// we've snuck in multiple notifications while the dispatcher is busy... don't add indefinitely to this list
if len(cbl.newHeadToAdd) > 10 /* not considered worth adding/explaining a tuning property for this */ {
if len(cbl.newHeadToAdd) >= 10 /* not considered worth adding/explaining a tuning property for this */ {
log.L(cbl.ctx).Infof("Block listener fell behind head of chain")
cbl.newHeadToAdd = nil
} else {
dispatchHead = cbl.newHeadToAdd[len(cbl.newHeadToAdd)-1]
// Nothing more we can do in this function until it catches up - we just have to discard the notification
return
}
dispatchHead = cbl.newHeadToAdd[len(cbl.newHeadToAdd)-1]
}
if dispatchHead == nil && len(cbl.blocksSinceCheckpoint) > 0 {
dispatchHead = cbl.blocksSinceCheckpoint[len(cbl.blocksSinceCheckpoint)-1]
Expand Down
44 changes: 23 additions & 21 deletions internal/confirmations/confirmed_block_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package confirmations

import (
"context"
"fmt"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -338,38 +339,39 @@ func TestCBLDispatcherFallsBehindHead(t *testing.T) {
mbiHash := mca.On("BlockInfoByHash", mock.Anything, mock.Anything)
mbiHash.Run(func(args mock.Arguments) { mockBlockHashReturn(mbiHash, args, blocks) })

// We'll fall back to this because we don't keep up
mbiNum := mca.On("BlockInfoByNumber", mock.Anything, mock.Anything)
mbiNum.Run(func(args mock.Arguments) { mockBlockNumberReturn(mbiNum, args, blocks) })

bcm.requiredConfirmations = 5

// Start a CBL, but then cancel the dispatcher
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", nil, esDispatch)
assert.NoError(t, err)
cbl.cancelFunc()
<-cbl.dispatcherDone
<-cbl.processorDone

// Restart just the processor
cbl.ctx, cbl.cancelFunc = context.WithCancel(bcm.ctx)
cbl.processorDone = make(chan struct{})
go cbl.notificationProcessor()

// Notify all the blocks before we process any
// Notify all the blocks
for i := 0; i < len(blocks); i++ {
bcm.propagateBlockHashToCBLs(&ffcapi.BlockHashEvent{
BlockHashes: []string{blocks[i].BlockHash},
})
}

for i := 0; i < len(blocks)-bcm.requiredConfirmations; i++ {
// The dispatches should have been added, until it got too far ahead
// and then set to nil.
for cbl.newHeadToAdd != nil {
time.Sleep(1 * time.Millisecond)
}
b := <-esDispatch
assert.Equal(t, b.BlockEvent.BlockNumber.Uint64(), blocks[i].BlockNumber.Uint64())
assert.Equal(t, b.BlockEvent.BlockInfo, blocks[i].BlockInfo)
}
// Wait until there's a first tap to the dispatcher, meaning
// dispatches should have been added to the newHeadToAdd list
<-cbl.dispatcherTap

time.Sleep(1 * time.Millisecond)
assert.Len(t, cbl.blocksSinceCheckpoint, bcm.requiredConfirmations)
select {
case <-esDispatch:
assert.Fail(t, "should not have received block in confirmation window")
default: // good - we should have the confirmations sat there, but no dispatch
// ... until it got too far ahead and then set to nil.
checkHeadLen := func() int {
cbl.stateLock.Lock()
defer cbl.stateLock.Unlock()
return len(cbl.newHeadToAdd)
}
for checkHeadLen() < 10 {
time.Sleep(1 * time.Millisecond)
}

bcm.Stop()
Expand Down

0 comments on commit 97dc2d9

Please sign in to comment.