Skip to content

Commit

Permalink
change paradigm from checking for missing childrent to keeping track …
Browse files Browse the repository at this point in the history
…of tip via electrs

prior, to solve for forks we would try to fill in "missing children".  the way we (I) had written it, there is no way to guarantee that we will get to the tip; if there were siblings at that height then we would not check for missing blocks.

a better way to do it it to check for the tip of the btc chain to change; when that changes, update the btc chain until we get to a known block
  • Loading branch information
ClaytonNorthey92 committed Jan 14, 2025
1 parent 54ce612 commit 302b5f8
Showing 4 changed files with 48 additions and 241 deletions.
1 change: 0 additions & 1 deletion database/bfgd/database.go
Original file line number Diff line number Diff line change
@@ -25,7 +25,6 @@ type Database interface {
BtcBlockInsert(ctx context.Context, bb *BtcBlock) error
BtcBlockByHash(ctx context.Context, hash [32]byte) (*BtcBlock, error)
BtcBlockHeightByHash(ctx context.Context, hash [32]byte) (uint64, error)
BtcBlocksHeightsWithNoChildren(ctx context.Context) ([]uint64, error)

// Pop data
PopBasisByL2KeystoneAbrevHash(ctx context.Context, aHash [32]byte, excludeUnconfirmed bool, page uint32) ([]PopBasis, error)
162 changes: 0 additions & 162 deletions database/bfgd/database_ext_test.go
Original file line number Diff line number Diff line change
@@ -1752,168 +1752,6 @@ func TestL2BtcFinalitiesByL2KeystoneNotPublishedHeight(t *testing.T) {
}
}

func TestBtcHeightsNoChildren(t *testing.T) {
type testTableItem struct {
name string
numberToCreateWithChildren int
numberToCreateWithNoChildren int
overlapCount int
}

testTable := []testTableItem{
{
name: "0",
numberToCreateWithNoChildren: 0,
numberToCreateWithChildren: 43,
},
{
name: "less than 100",
numberToCreateWithNoChildren: 76,
numberToCreateWithChildren: 4,
},
{
name: "more than 100",
numberToCreateWithNoChildren: 126,
numberToCreateWithChildren: 333,
},
{
name: "more than 100 and overlap",
numberToCreateWithNoChildren: 126,
numberToCreateWithChildren: 333,
overlapCount: 98,
},
}

createBlocksWithNoChildren := func(ctx context.Context, count int, db bfgd.Database) []int64 {
heights := make([]int64, count)
for i := range count {
height := mathrand.Int64()
hash := make([]byte, 32)
if _, err := rand.Read(hash); err != nil {
t.Fatal(err)
}
header := make([]byte, 80)
if _, err := rand.Read(header); err != nil {
t.Fatal(err)
}

btcBlock := bfgd.BtcBlock{
Height: uint64(height),
Hash: hash,
Header: header,
}

if err := db.BtcBlockInsert(ctx, &btcBlock); err != nil {
t.Fatal(err)
}

heights[i] = height
}

return heights
}

createBlocksWithChildren := func(ctx context.Context, count int, db bfgd.Database, avoidHeights []int64, overlapHeights []int64) []int64 {
var prevHash []byte
overlapHeightI := 0
heights := make([]int64, count)
for i := range count {
var height int64
for {
if overlapHeightI < len(overlapHeights) {
height = overlapHeights[overlapHeightI]
overlapHeightI++
break
}

height = mathrand.Int64()
if !slices.Contains(avoidHeights, height) {
break
}
}
hash := make([]byte, 32)
if _, err := rand.Read(hash); err != nil {
t.Fatal(err)
}
header := make([]byte, 80)
if _, err := rand.Read(header); err != nil {
t.Fatal(err)
}

if len(prevHash) > 0 {
for k := range 32 {
header[k+4] = prevHash[k]
}
}

btcBlock := bfgd.BtcBlock{
Height: uint64(height),
Hash: hash,
Header: header,
}

if err := db.BtcBlockInsert(ctx, &btcBlock); err != nil {
t.Fatal(err)
}
prevHash = hash
heights[i] = height
}
return heights
}

for _, tti := range testTable {
t.Run(tti.name, func(t *testing.T) {
ctx, cancel := defaultTestContext()
defer cancel()

db, sdb, cleanup := createTestDB(ctx, t)
defer func() {
db.Close()
sdb.Close()
cleanup()
}()

var overlapHeights []int64
noChildrenHeights := createBlocksWithNoChildren(ctx, tti.numberToCreateWithNoChildren, db)

childrenHeights := createBlocksWithChildren(ctx, tti.numberToCreateWithChildren, db, nil, overlapHeights)

if tti.overlapCount > 0 {
overlapHeights = noChildrenHeights[:tti.overlapCount]
oldChildrenHeights := childrenHeights
for _, o := range oldChildrenHeights {
if !slices.Contains(overlapHeights, o) {
childrenHeights = append(childrenHeights, o)
}
}
}

heights, err := db.BtcBlocksHeightsWithNoChildren(ctx)
if err != nil {
t.Fatal(err)
}

toCmp := make([]uint64, len(noChildrenHeights)+1)
for i, c := range noChildrenHeights {
toCmp[i] = uint64(c)
}
toCmp[len(toCmp)-1] = uint64(childrenHeights[len(childrenHeights)-1])

slices.Sort(heights)
slices.Sort(toCmp)

// we return a nil slice if emtpy, change that here for deep.Equal
if len(heights) == 0 {
heights = []uint64{}
}

if diff := deep.Equal(toCmp[:len(toCmp)-1], heights); len(diff) != 0 {
t.Fatalf("unexpected diff %s", diff)
}
})
}
}

type BtcTransactionBroadcastRequest struct {
TxId string
SerializedTx []byte
51 changes: 0 additions & 51 deletions database/bfgd/postgres/postgres.go
Original file line number Diff line number Diff line change
@@ -695,57 +695,6 @@ func (p *pgdb) BtcBlockCanonicalHeight(ctx context.Context) (uint64, error) {
return result, nil
}

// BtcBlocksHeightsWithNoChildren returns the heights of blocks stored in the
// database that do not have any children, these represent possible forks that
// have not been handled yet.
func (p *pgdb) BtcBlocksHeightsWithNoChildren(ctx context.Context) ([]uint64, error) {
log.Tracef("BtcBlocksHeightsWithNoChildren")
defer log.Tracef("BtcBlocksHeightsWithNoChildren exit")

// Query all heights from btc_blocks where the block does not have any
// children and there are no other blocks at the same height with children.
// Excludes the tip because it will not have any children.
const q = `
SELECT height FROM btc_blocks bb1
WHERE NOT EXISTS (SELECT * FROM btc_blocks bb2 WHERE substr(bb2.header, 5, 32) = bb1.hash)
AND NOT EXISTS (
SELECT * FROM btc_blocks bb3 WHERE bb1.height = bb3.height
AND EXISTS (
SELECT * FROM btc_blocks bb4 WHERE substr(bb4.header, 5, 32) = bb3.hash
)
)
ORDER BY height DESC
OFFSET $1 + 1
LIMIT 100
`

var heights []uint64
for offset := 0; ; offset += 100 {
rows, err := p.db.QueryContext(ctx, q, offset)
if err != nil {
return nil, err
}
defer rows.Close()

startingLength := len(heights)
for rows.Next() {
var v uint64
if err := rows.Scan(&v); err != nil {
return nil, err
}
heights = append(heights, v)
}

if startingLength == len(heights) {
return heights, nil
}

if rows.Err() != nil {
return nil, rows.Err()
}
}
}

func (p *pgdb) refreshBTCBlocksCanonical(ctx context.Context) error {
// XXX this probably should be REFRESH MATERIALIZED VIEW CONCURRENTLY
// however, this is more testable at the moment and we're in a time crunch,
75 changes: 48 additions & 27 deletions service/bfg/bfg.go
Original file line number Diff line number Diff line change
@@ -156,7 +156,7 @@ type Server struct {
// if this grows we need to notify subscribers
canonicalChainHeight uint64

checkForInvalidBlocks chan struct{}
checkForTipChange chan struct{}

l2keystonesCache []hemi.L2Keystone

@@ -254,17 +254,17 @@ func NewServer(cfg *Config) (*Server, error) {
)
}
s := &Server{
cfg: cfg,
requestLimiter: make(chan bool, cfg.RequestLimit),
btcHeight: cfg.BTCStartHeight,
server: http.NewServeMux(),
publicServer: http.NewServeMux(),
metrics: newMetrics(cfg),
sessions: make(map[string]*bfgWs),
checkForInvalidBlocks: make(chan struct{}),
holdoffTimeout: 6 * time.Second,
bfgCallTimeout: 20 * time.Second,
bfgCmdCh: make(chan bfgCmd),
cfg: cfg,
requestLimiter: make(chan bool, cfg.RequestLimit),
btcHeight: cfg.BTCStartHeight,
server: http.NewServeMux(),
publicServer: http.NewServeMux(),
metrics: newMetrics(cfg),
sessions: make(map[string]*bfgWs),
checkForTipChange: make(chan struct{}),
holdoffTimeout: 6 * time.Second,
bfgCallTimeout: 20 * time.Second,
bfgCmdCh: make(chan bfgCmd),
}
for range cfg.RequestLimit {
s.requestLimiter <- true
@@ -297,33 +297,45 @@ func NewServer(cfg *Config) (*Server, error) {
return s, nil
}

func (s *Server) queueCheckForInvalidBlocks() {
func (s *Server) queueCheckForTipChange() {
select {
case s.checkForInvalidBlocks <- struct{}{}:
case s.checkForTipChange <- struct{}{}:
default:
}
}

func (s *Server) invalidBlockChecker(ctx context.Context) {
func (s *Server) tipChangeChecker(ctx context.Context) {
defer s.wg.Done()
for {
select {
case <-ctx.Done():
return
case <-s.checkForInvalidBlocks:
heights, err := s.db.BtcBlocksHeightsWithNoChildren(ctx)
case <-s.checkForTipChange:
height, err := s.btcClient.Height(ctx)
if err != nil {
log.Errorf("error trying to get heights for btc blocks: %s", err)
return
log.Errorf("error getting height: %s", err)
s.queueCheckForTipChange()
continue
}

log.Infof("received %d heights with no children, will re-check", len(heights))
for _, height := range heights {
log.Infof("reprocessing block at height %d", height)
if err := s.processBitcoinBlock(ctx, height); err != nil {
log.Tracef("checking for missing blocks, walking from tip %d to %d", height, s.cfg.BTCStartHeight)

for height > s.cfg.BTCStartHeight {
err := s.processBitcoinBlock(ctx, height, true)
if errors.Is(err, database.ErrDuplicate) {
log.Tracef("block is already found, exiting")
break
}

if err != nil {
log.Errorf("error processing bitcoin block: %s", err)
break
}

height--
}

s.queueCheckForTipChange()
}
}
}
@@ -591,7 +603,7 @@ func (s *Server) handleBitcoinUTXOs(ctx context.Context, bur *bfgapi.BitcoinUTXO
return buResp, nil
}

func (s *Server) processBitcoinBlock(ctx context.Context, height uint64) error {
func (s *Server) processBitcoinBlock(ctx context.Context, height uint64, failOnDuplicate bool) error {
log.Tracef("Processing Bitcoin block at height %d...", height)

rbh, err := s.btcClient.RawBlockHeader(ctx, height)
@@ -619,6 +631,11 @@ func (s *Server) processBitcoinBlock(ctx context.Context, height uint64) error {
// XXX don't return err here so we keep counting up, need to be smarter
if errors.Is(err, database.ErrDuplicate) {
log.Errorf("could not insert btc block: %s", err)

if failOnDuplicate {
return err
}

return nil
}
}
@@ -740,12 +757,11 @@ func (s *Server) processBitcoinBlock(ctx context.Context, height uint64) error {

func (s *Server) processBitcoinBlocks(ctx context.Context, start, end uint64) error {
for i := start; i <= end; i++ {
if err := s.processBitcoinBlock(ctx, i); err != nil {
if err := s.processBitcoinBlock(ctx, i, false); err != nil {
return fmt.Errorf("process bitcoin block at height %d: %w", i, err)
}
s.btcHeight = i
}
s.queueCheckForInvalidBlocks()
return nil
}

@@ -794,6 +810,11 @@ func (s *Server) trackBitcoin(ctx context.Context) {
log.Errorf("Failed to process Bitcoin blocks: %v", err)
continue
}

// once we process up the chain once, in the future let's process
// down from the tip in case it changes
s.queueCheckForTipChange()
return
}
}
}
@@ -1981,7 +2002,7 @@ func (s *Server) Run(pctx context.Context) error {
go s.trackBitcoin(ctx)

s.wg.Add(1)
go s.invalidBlockChecker(ctx)
go s.tipChangeChecker(ctx)

select {
case <-ctx.Done():

0 comments on commit 302b5f8

Please sign in to comment.