diff --git a/database/bfgd/database.go b/database/bfgd/database.go index 1434519cb..05606892c 100644 --- a/database/bfgd/database.go +++ b/database/bfgd/database.go @@ -23,9 +23,9 @@ type Database interface { // Btc block table BtcBlockInsert(ctx context.Context, bb *BtcBlock) error + BtcBlockReplace(ctx context.Context, btcBlock *BtcBlock) (int64, error) BtcBlockByHash(ctx context.Context, hash [32]byte) (*BtcBlock, error) BtcBlockHeightByHash(ctx context.Context, hash [32]byte) (uint64, error) - BtcBlocksHeightsWithNoChildren(ctx context.Context) ([]uint64, error) // Pop data PopBasisByL2KeystoneAbrevHash(ctx context.Context, aHash [32]byte, excludeUnconfirmed bool, page uint32) ([]PopBasis, error) diff --git a/database/bfgd/database_ext_test.go b/database/bfgd/database_ext_test.go index f7a8eac19..e6de4cf6d 100644 --- a/database/bfgd/database_ext_test.go +++ b/database/bfgd/database_ext_test.go @@ -66,7 +66,7 @@ func createTestDB(ctx context.Context, t *testing.T) (bfgd.Database, *sql.DB, fu t.Fatalf("Failed to connect to database: %v", err) } - dbn := mathrand.IntN(9999) + dbn := mathrand.IntN(999999999) dbName := fmt.Sprintf("%v_%d", testDBPrefix, dbn) t.Logf("Creating test database %v", dbName) @@ -305,6 +305,161 @@ func (b *btcBlocksNtfn) handleBtcBlocksNotification(pctx context.Context, table } } +func TestBtcBlockReplaceNotIfSameBlock(t *testing.T) { + ctx, cancel := defaultTestContext() + defer cancel() + + db, sdb, cleanup := createTestDB(ctx, t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + btcBlock := bfgd.BtcBlock{ + Hash: fillOutBytes("MyHaSh", 32), + Header: fillOutBytes("myHeAdEr", 80), + Height: 1, + } + + rowsAffected, err := db.BtcBlockReplace(ctx, &btcBlock) + if err != nil { + t.Fatal(err) + } + + if rowsAffected != 1 { + t.Fatalf("unexpected rows affected: %d", rowsAffected) + } + + rowsAffected, err = db.BtcBlockReplace(ctx, &btcBlock) + if err != nil { + t.Fatal(err) + } + + if rowsAffected != 0 { + t.Fatalf("unexpected rows affected: %d", rowsAffected) + } + + result, err := db.BtcBlockByHash(ctx, [32]byte(btcBlock.Hash)) + if err != nil { + t.Fatal(err) + } + + if diff := deep.Equal(result, &btcBlock); len(diff) > 0 { + t.Fatalf("unexpected diff: %s", diff) + } +} + +func TestBtcBlockReplaceIfDifferentBlocks(t *testing.T) { + ctx, cancel := defaultTestContext() + defer cancel() + + db, sdb, cleanup := createTestDB(ctx, t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + btcBlock := bfgd.BtcBlock{ + Hash: fillOutBytes("MyHaSh", 32), + Header: fillOutBytes("myHeAdEr", 80), + Height: 1, + } + + rowsAffected, err := db.BtcBlockReplace(ctx, &btcBlock) + if err != nil { + t.Fatal(err) + } + + if rowsAffected != 1 { + t.Fatalf("unexpected rows affected: %d", rowsAffected) + } + + btcBlock.Hash = fillOutBytes("differenthash", 32) + btcBlock.Header = fillOutBytes("differentheader", 80) + + rowsAffected, err = db.BtcBlockReplace(ctx, &btcBlock) + if err != nil { + t.Fatal(err) + } + + if rowsAffected != 1 { + t.Fatalf("unexpected rows affected: %d", rowsAffected) + } + + result, err := db.BtcBlockByHash(ctx, [32]byte(btcBlock.Hash)) + if err != nil { + t.Fatal(err) + } + + if diff := deep.Equal(result, &btcBlock); len(diff) > 0 { + t.Fatalf("unexpected diff: %s", diff) + } +} + +func TestBtcBlockNoReplaceIfDifferentBlocksAtDifferentHeights(t *testing.T) { + ctx, cancel := defaultTestContext() + defer cancel() + + db, sdb, cleanup := createTestDB(ctx, t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + btcBlock := bfgd.BtcBlock{ + Hash: fillOutBytes("MyHaSh", 32), + Header: fillOutBytes("myHeAdEr", 80), + Height: 1, + } + + rowsAffected, err := db.BtcBlockReplace(ctx, &btcBlock) + if err != nil { + t.Fatal(err) + } + + if rowsAffected != 1 { + t.Fatalf("unexpected rows affected: %d", rowsAffected) + } + + btcBlock.Hash = fillOutBytes("differenthash", 32) + btcBlock.Header = fillOutBytes("differentheader", 80) + btcBlock.Height = 7 + + rowsAffected, err = db.BtcBlockReplace(ctx, &btcBlock) + if err != nil { + t.Fatal(err) + } + + if rowsAffected != 1 { + t.Fatalf("unexpected rows affected: %d", rowsAffected) + } + + result, err := db.BtcBlockByHash(ctx, [32]byte(fillOutBytes("MyHaSh", 32))) + if err != nil { + t.Fatal(err) + } + + if diff := deep.Equal(result, &bfgd.BtcBlock{ + Hash: fillOutBytes("MyHaSh", 32), + Header: fillOutBytes("myHeAdEr", 80), + Height: 1, + }); len(diff) > 0 { + t.Fatalf("unexpected diff: %s", diff) + } + + result, err = db.BtcBlockByHash(ctx, [32]byte(btcBlock.Hash)) + if err != nil { + t.Fatal(err) + } + + if diff := deep.Equal(result, &btcBlock); len(diff) > 0 { + t.Fatalf("unexpected diff: %s", diff) + } +} + func TestDatabaseNotification(t *testing.T) { pctx := context.Background() @@ -1408,207 +1563,6 @@ func TestPopBasisUpdateOneExistsWithNullBTCFields(t *testing.T) { } } -func TestBtcBlockGetCanonicalChain(t *testing.T) { - type testTableItem struct { - name string - onChainCount int - offChainCount int - } - - testTable := []testTableItem{ - { - name: "2 on, 1 off", - onChainCount: 2, - offChainCount: 1, - }, - { - name: "1 on, 2 off", - onChainCount: 1, - offChainCount: 2, - }, - { - name: "100 on, 99 off", - onChainCount: 100, - offChainCount: 99, - }, - } - - for _, tti := range testTable { - t.Run(tti.name, func(t *testing.T) { - ctx, cancel := defaultTestContext() - defer cancel() - - db, sdb, cleanup := createTestDB(ctx, t) - defer func() { - db.Close() - sdb.Close() - cleanup() - }() - - height := 1 - l2BlockNumber := uint32(9999) - - onChainBlocks := []bfgd.BtcBlock{} - - // create off-chain blocks - offChainBlocks := createBtcBlocksAtStartingHeight(ctx, t, db, tti.offChainCount, false, height, []byte{}, l2BlockNumber) - if len(offChainBlocks) != tti.offChainCount { - t.Fatalf("created an incorrect number of on-chain blocks %d", - len(offChainBlocks), - ) - } - - height += 10000 - l2BlockNumber += 1000 - // create on-chain blocks - onChainBlocks = createBtcBlocksAtStartingHeight(ctx, t, db, tti.onChainCount, true, height, []byte{}, l2BlockNumber) - - limit := tti.onChainCount - - bfs, err := db.L2BTCFinalityMostRecent(ctx, uint32(limit)) - if err != nil { - t.Fatal(err) - } - - if len(bfs) > limit { - t.Fatalf("bfs too long %d", len(bfs)) - } - - slices.Reverse(onChainBlocks) - - for i, block := range onChainBlocks { - if i == limit { - break - } - found := false - for k, v := range bfs { - if slices.Equal(block.Hash, v.BTCPubHeaderHash) == true { - t.Logf("found hash in result set: %s", block.Hash) - found = true - if k < len(bfs)-1 { - t.Logf("next has is %s", bfs[k+1].BTCPubHeaderHash) - } - } - } - if found == false { - t.Fatalf("could not find hash in result set: %s", block.Hash) - } - } - - for _, block := range offChainBlocks { - found := false - for _, v := range bfs { - if slices.Equal(block.Hash, v.BTCPubHeaderHash) == true { - t.Logf("found hash in result set: %s", block.Hash) - found = true - } - } - if found == true { - t.Fatalf("hash should not have been included in result set: %s", block.Hash) - } - } - }) - } -} - -func TestBtcBlockGetCanonicalChainWithForks(t *testing.T) { - type testTableItem struct { - name string - chainPattern []int - unconfirmedIndices []bool - } - - testTable := []testTableItem{ - { - name: "fork at tip", - chainPattern: []int{1, 1, 2}, - unconfirmedIndices: []bool{false, false, false, false}, - }, - { - name: "fork in middle", - chainPattern: []int{1, 2, 1}, - unconfirmedIndices: []bool{false, false, false, false}, - }, - { - name: "fork in beginning", - chainPattern: []int{2, 1, 1}, - unconfirmedIndices: []bool{false, false, false, false}, - }, - { - name: "fork in beginning with break", - chainPattern: []int{2, 1, 1, 1}, - unconfirmedIndices: []bool{false, false, true, false}, - }, - { - name: "fork in beginning with multiple breaks", - chainPattern: []int{2, 1, 1, 1, 1}, - unconfirmedIndices: []bool{false, true, false, true, false}, - }, - } - - for _, tti := range testTable { - t.Run(tti.name, func(t *testing.T) { - ctx, cancel := defaultTestContext() - defer cancel() - - db, sdb, cleanup := createTestDB(ctx, t) - defer func() { - db.Close() - sdb.Close() - cleanup() - }() - - height := 1 - - onChainBlocks := []bfgd.BtcBlock{} - - l2BlockNumber := uint32(1000) - lastHash := []byte{} - for i, blockCountAtHeight := range tti.chainPattern { - onChainBlocksTmp := createBtcBlocksAtStaticHeight(ctx, t, db, blockCountAtHeight, true, height, lastHash, l2BlockNumber) - l2BlockNumber++ - height++ - lastHash = onChainBlocksTmp[0].Hash - - if (blockCountAtHeight > 1 && i == len(tti.chainPattern)-1) == false { - onChainBlocks = append(onChainBlocks, onChainBlocksTmp[0]) - } - } - - rows, err := sdb.QueryContext(ctx, ` - SELECT hash FROM btc_blocks_can ORDER BY height DESC - `) - if err != nil { - t.Fatal(err) - } - - defer rows.Close() - - hashes := []database.ByteArray{} - - for rows.Next() { - var hash database.ByteArray - if err := rows.Scan(&hash); err != nil { - t.Fatal(err) - } - hashes = append(hashes, hash) - } - - if len(onChainBlocks) != len(hashes) { - t.Fatalf("length of onChainBlocks and pbs differs %d != %d", len(onChainBlocks), len(hashes)) - } - - slices.Reverse(onChainBlocks) - - for i := range onChainBlocks { - if slices.Equal(onChainBlocks[i].Hash, hashes[i]) == false { - t.Fatalf("hash mismatch: %s != %s", onChainBlocks[i].Hash, hashes[i]) - } - } - }) - } -} - func TestPublications(t *testing.T) { type testTableItem struct { name string @@ -1752,168 +1706,6 @@ func TestL2BtcFinalitiesByL2KeystoneNotPublishedHeight(t *testing.T) { } } -func TestBtcHeightsNoChildren(t *testing.T) { - type testTableItem struct { - name string - numberToCreateWithChildren int - numberToCreateWithNoChildren int - overlapCount int - } - - testTable := []testTableItem{ - { - name: "0", - numberToCreateWithNoChildren: 0, - numberToCreateWithChildren: 43, - }, - { - name: "less than 100", - numberToCreateWithNoChildren: 76, - numberToCreateWithChildren: 4, - }, - { - name: "more than 100", - numberToCreateWithNoChildren: 126, - numberToCreateWithChildren: 333, - }, - { - name: "more than 100 and overlap", - numberToCreateWithNoChildren: 126, - numberToCreateWithChildren: 333, - overlapCount: 98, - }, - } - - createBlocksWithNoChildren := func(ctx context.Context, count int, db bfgd.Database) []int64 { - heights := make([]int64, count) - for i := range count { - height := mathrand.Int64() - hash := make([]byte, 32) - if _, err := rand.Read(hash); err != nil { - t.Fatal(err) - } - header := make([]byte, 80) - if _, err := rand.Read(header); err != nil { - t.Fatal(err) - } - - btcBlock := bfgd.BtcBlock{ - Height: uint64(height), - Hash: hash, - Header: header, - } - - if err := db.BtcBlockInsert(ctx, &btcBlock); err != nil { - t.Fatal(err) - } - - heights[i] = height - } - - return heights - } - - createBlocksWithChildren := func(ctx context.Context, count int, db bfgd.Database, avoidHeights []int64, overlapHeights []int64) []int64 { - var prevHash []byte - overlapHeightI := 0 - heights := make([]int64, count) - for i := range count { - var height int64 - for { - if overlapHeightI < len(overlapHeights) { - height = overlapHeights[overlapHeightI] - overlapHeightI++ - break - } - - height = mathrand.Int64() - if !slices.Contains(avoidHeights, height) { - break - } - } - hash := make([]byte, 32) - if _, err := rand.Read(hash); err != nil { - t.Fatal(err) - } - header := make([]byte, 80) - if _, err := rand.Read(header); err != nil { - t.Fatal(err) - } - - if len(prevHash) > 0 { - for k := range 32 { - header[k+4] = prevHash[k] - } - } - - btcBlock := bfgd.BtcBlock{ - Height: uint64(height), - Hash: hash, - Header: header, - } - - if err := db.BtcBlockInsert(ctx, &btcBlock); err != nil { - t.Fatal(err) - } - prevHash = hash - heights[i] = height - } - return heights - } - - for _, tti := range testTable { - t.Run(tti.name, func(t *testing.T) { - ctx, cancel := defaultTestContext() - defer cancel() - - db, sdb, cleanup := createTestDB(ctx, t) - defer func() { - db.Close() - sdb.Close() - cleanup() - }() - - var overlapHeights []int64 - noChildrenHeights := createBlocksWithNoChildren(ctx, tti.numberToCreateWithNoChildren, db) - - childrenHeights := createBlocksWithChildren(ctx, tti.numberToCreateWithChildren, db, nil, overlapHeights) - - if tti.overlapCount > 0 { - overlapHeights = noChildrenHeights[:tti.overlapCount] - oldChildrenHeights := childrenHeights - for _, o := range oldChildrenHeights { - if !slices.Contains(overlapHeights, o) { - childrenHeights = append(childrenHeights, o) - } - } - } - - heights, err := db.BtcBlocksHeightsWithNoChildren(ctx) - if err != nil { - t.Fatal(err) - } - - toCmp := make([]uint64, len(noChildrenHeights)+1) - for i, c := range noChildrenHeights { - toCmp[i] = uint64(c) - } - toCmp[len(toCmp)-1] = uint64(childrenHeights[len(childrenHeights)-1]) - - slices.Sort(heights) - slices.Sort(toCmp) - - // we return a nil slice if emtpy, change that here for deep.Equal - if len(heights) == 0 { - heights = []uint64{} - } - - if diff := deep.Equal(toCmp[:len(toCmp)-1], heights); len(diff) != 0 { - t.Fatalf("unexpected diff %s", diff) - } - }) - } -} - type BtcTransactionBroadcastRequest struct { TxId string SerializedTx []byte diff --git a/database/bfgd/postgres/postgres.go b/database/bfgd/postgres/postgres.go index 8bfbe78ae..0d8d0f1cf 100644 --- a/database/bfgd/postgres/postgres.go +++ b/database/bfgd/postgres/postgres.go @@ -20,7 +20,7 @@ import ( ) const ( - bfgdVersion = 14 + bfgdVersion = 15 logLevel = "INFO" verbose = false @@ -60,13 +60,6 @@ func New(ctx context.Context, uri string) (*pgdb, error) { db: pg.DB(), } - // first, refresh the materialized view so it can be used in case it was - // never refreshed before this point - err = p.refreshBTCBlocksCanonical(ctx) - if err != nil { - return nil, err - } - return p, nil } @@ -242,6 +235,36 @@ func (p *pgdb) L2KeystonesMostRecentN(ctx context.Context, n uint32, page uint32 return ks, nil } +func (p *pgdb) BtcBlockReplace(ctx context.Context, btcBlock *bfgd.BtcBlock) (int64, error) { + log.Tracef("BtcBlockReplace") + defer log.Tracef("BtcBlockReplace exit") + + // since we are now trusting our btc client to construct the canonical chain, + // height is now unique to our btc blocks. if there is a conflict where + // the block at a given height is not what we have saved in the database (comparing header+hash) + // then we must have had a re-org so we replace + + insertSql := ` + INSERT INTO btc_blocks (hash, header, height) + VALUES ($1, $2, $3) + ON CONFLICT (height) + DO UPDATE SET hash = EXCLUDED.hash, header = EXCLUDED.header, height = EXCLUDED.height + WHERE btc_blocks.hash != EXCLUDED.hash OR btc_blocks.header != EXCLUDED.header + ` + + results, err := p.db.ExecContext(ctx, insertSql, btcBlock.Hash, btcBlock.Header, btcBlock.Height) + if err != nil { + return 0, err + } + + rowsAffected, err := results.RowsAffected() + if err != nil { + return 0, err + } + + return rowsAffected, nil +} + func (p *pgdb) BtcBlockInsert(ctx context.Context, bb *bfgd.BtcBlock) error { log.Tracef("BtcBlockInsert") defer log.Tracef("BtcBlockInsert exit") @@ -573,7 +596,7 @@ func (p *pgdb) L2BTCFinalityByL2KeystoneAbrevHash(ctx context.Context, l2Keyston sql := ` WITH relevant_pop_basis AS ( SELECT l2_keystone_abrev_hash, btc_block_hash FROM pop_basis WHERE btc_block_hash = ANY( - SELECT hash FROM btc_blocks_can ORDER BY height DESC LIMIT 100 + SELECT hash FROM btc_blocks ORDER BY height DESC LIMIT 100 ) ), l2_keystones_lowest_btc_block AS ( @@ -586,18 +609,18 @@ func (p *pgdb) L2BTCFinalityByL2KeystoneAbrevHash(ctx context.Context, l2Keyston l2_keystones.state_root, l2_keystones.ep_hash, l2_keystones.version, - btc_blocks_can_tmp.hash AS btc_block_hash, - btc_blocks_can_tmp.height AS btc_block_height + btc_blocks_tmp.hash AS btc_block_hash, + btc_blocks_tmp.height AS btc_block_height FROM l2_keystones LEFT JOIN LATERAL ( - SELECT pop_basis.l2_keystone_abrev_hash, btc_blocks_can.hash, btc_blocks_can.height + SELECT pop_basis.l2_keystone_abrev_hash, btc_blocks.hash, btc_blocks.height FROM pop_basis LEFT JOIN LATERAL ( - SELECT hash, height FROM btc_blocks_can WHERE hash = pop_basis.btc_block_hash + SELECT hash, height FROM btc_blocks WHERE hash = pop_basis.btc_block_hash ORDER BY height ASC LIMIT 1 - ) btc_blocks_can ON TRUE + ) btc_blocks ON TRUE WHERE pop_basis.l2_keystone_abrev_hash = l2_keystones.l2_keystone_abrev_hash LIMIT 1 - ) btc_blocks_can_tmp ON TRUE + ) btc_blocks_tmp ON TRUE WHERE l2_keystones.l2_keystone_abrev_hash = ANY($1) ) SELECT @@ -614,18 +637,18 @@ func (p *pgdb) L2BTCFinalityByL2KeystoneAbrevHash(ctx context.Context, l2Keyston COALESCE((SELECT height FROM ( - SELECT height FROM btc_blocks_can + SELECT height FROM btc_blocks LEFT JOIN relevant_pop_basis ON relevant_pop_basis.btc_block_hash - = btc_blocks_can.hash + = btc_blocks.hash LEFT JOIN l2_keystones ll ON ll.l2_keystone_abrev_hash = relevant_pop_basis.l2_keystone_abrev_hash AND ll.l2_block_number >= l2_keystones_lowest_btc_block.l2_block_number - WHERE height > (SELECT height FROM btc_blocks_can ORDER BY height DESC LIMIT 1) - 100 + WHERE height > (SELECT height FROM btc_blocks ORDER BY height DESC LIMIT 1) - 100 AND ll.l2_keystone_abrev_hash IS NOT NULL ORDER BY height ASC LIMIT 1 )), 0), - COALESCE((SELECT height FROM btc_blocks_can ORDER BY height DESC LIMIT 1),0) + COALESCE((SELECT height FROM btc_blocks ORDER BY height DESC LIMIT 1),0) FROM l2_keystones_lowest_btc_block @@ -685,7 +708,7 @@ func (p *pgdb) BtcBlockCanonicalHeight(ctx context.Context) (uint64, error) { log.Tracef("BtcBlockCanonicalHeight") defer log.Tracef("BtcBlockCanonicalHeight exit") - const q = `SELECT COALESCE(MAX(height),0) FROM btc_blocks_can LIMIT 1` + const q = `SELECT COALESCE(MAX(height),0) FROM btc_blocks LIMIT 1` var result uint64 if err := p.db.QueryRowContext(ctx, q).Scan(&result); err != nil { @@ -695,70 +718,6 @@ func (p *pgdb) BtcBlockCanonicalHeight(ctx context.Context) (uint64, error) { return result, nil } -// BtcBlocksHeightsWithNoChildren returns the heights of blocks stored in the -// database that do not have any children, these represent possible forks that -// have not been handled yet. -func (p *pgdb) BtcBlocksHeightsWithNoChildren(ctx context.Context) ([]uint64, error) { - log.Tracef("BtcBlocksHeightsWithNoChildren") - defer log.Tracef("BtcBlocksHeightsWithNoChildren exit") - - // Query all heights from btc_blocks where the block does not have any - // children and there are no other blocks at the same height with children. - // Excludes the tip because it will not have any children. - const q = ` - SELECT height FROM btc_blocks bb1 - WHERE NOT EXISTS (SELECT * FROM btc_blocks bb2 WHERE substr(bb2.header, 5, 32) = bb1.hash) - AND NOT EXISTS ( - SELECT * FROM btc_blocks bb3 WHERE bb1.height = bb3.height - AND EXISTS ( - SELECT * FROM btc_blocks bb4 WHERE substr(bb4.header, 5, 32) = bb3.hash - ) - ) - ORDER BY height DESC - OFFSET $1 + 1 - LIMIT 100 - ` - - var heights []uint64 - for offset := 0; ; offset += 100 { - rows, err := p.db.QueryContext(ctx, q, offset) - if err != nil { - return nil, err - } - defer rows.Close() - - startingLength := len(heights) - for rows.Next() { - var v uint64 - if err := rows.Scan(&v); err != nil { - return nil, err - } - heights = append(heights, v) - } - - if startingLength == len(heights) { - return heights, nil - } - - if rows.Err() != nil { - return nil, rows.Err() - } - } -} - -func (p *pgdb) refreshBTCBlocksCanonical(ctx context.Context) error { - // XXX this probably should be REFRESH MATERIALIZED VIEW CONCURRENTLY - // however, this is more testable at the moment and we're in a time crunch, - // this works - sql := "REFRESH MATERIALIZED VIEW btc_blocks_can" - _, err := p.db.ExecContext(ctx, sql) - if err != nil { - return err - } - - return nil -} - func (p *pgdb) BtcTransactionBroadcastRequestInsert(ctx context.Context, serializedTx []byte, txId string) error { log.Tracef("BtcTransactionBroadcastRequestInsert") defer log.Tracef("BtcTransactionBroadcastRequestInsert exit") diff --git a/database/bfgd/scripts/0015.sql b/database/bfgd/scripts/0015.sql new file mode 100644 index 000000000..5ee8d1c96 --- /dev/null +++ b/database/bfgd/scripts/0015.sql @@ -0,0 +1,26 @@ +-- Copyright (c) 2025 Hemi Labs, Inc. +-- Use of this source code is governed by the MIT License, +-- which can be found in the LICENSE file. + +BEGIN; + +UPDATE version SET version = 15; + +DROP TRIGGER btc_blocks_canonical_refresh_btc_blocks ON btc_blocks; + +DROP FUNCTION refresh_btc_blocks_can(); + +-- no longer have BFG responsible for the canonical chain +DROP MATERIALIZED VIEW btc_blocks_can; + + +-- we now trust electrs/bitcoind to maintain the best chain, height becomes +-- unique in this table +ALTER TABLE btc_blocks ADD UNIQUE (height); + +-- when a btc block becomes "invalid" (orphaned), delete it and all pop_bases +-- that referenced it +ALTER TABLE pop_basis DROP CONSTRAINT pop_basis_btc_block_hash_fkey; +ALTER TABLE pop_basis ADD CONSTRAINT pop_basis_btc_block_hash_fkey FOREIGN KEY (btc_block_hash) REFERENCES btc_blocks (hash) ON DELETE CASCADE; + +COMMIT; \ No newline at end of file diff --git a/e2e/e2e_ext_test.go b/e2e/e2e_ext_test.go index 898a608b0..7d72e393b 100644 --- a/e2e/e2e_ext_test.go +++ b/e2e/e2e_ext_test.go @@ -66,7 +66,7 @@ const ( mockEncodedBlockHeader = "\"0000c02048cd664586152c3dcf356d010cbb9216fdeb3b1aeae256d59a0700000000000086182c855545356ec11d94972cf31b97ef01ae7c9887f4349ad3f0caf2d3c0b118e77665efdf2819367881fb\"" mockTxHash = "7fe9c3262f8fe26764b01955b4c996296f7c0c72945af1556038a084fcb37dbb" mockTxPos = 3 - mockTxheight = 2 + mockTxheight = 10 mockElectrsConnectTimeout = 3 * time.Second ) @@ -1705,7 +1705,7 @@ func TestProcessBitcoinBlockNewBtcBlock(t *testing.T) { } btcHeaderHash := btcchainhash.DoubleHashB(expectedBtcBlockHeader) - btcHeight := 2 + btcHeight := 10 btcHeader := expectedBtcBlockHeader // 2 @@ -1742,9 +1742,15 @@ loop: t.Fatalf("unexpected diff %s", diff) } - l2k, err := db.L2KeystonesMostRecentN(ctx, 100, 0) - if err != nil { - t.Fatal(err) + var l2k []bfgd.L2Keystone + for { + l2k, err = db.L2KeystonesMostRecentN(ctx, 100, 0) + if err != nil { + t.Fatal(err) + } + if l2k != nil { + break + } } // assert that the L2Keystone was stored in the database, diff --git a/service/bfg/bfg.go b/service/bfg/bfg.go index f8c1fca83..755dc140a 100644 --- a/service/bfg/bfg.go +++ b/service/bfg/bfg.go @@ -156,8 +156,6 @@ type Server struct { // if this grows we need to notify subscribers canonicalChainHeight uint64 - checkForInvalidBlocks chan struct{} - l2keystonesCache []hemi.L2Keystone btcHeightCache uint64 @@ -254,17 +252,16 @@ func NewServer(cfg *Config) (*Server, error) { ) } s := &Server{ - cfg: cfg, - requestLimiter: make(chan bool, cfg.RequestLimit), - btcHeight: cfg.BTCStartHeight, - server: http.NewServeMux(), - publicServer: http.NewServeMux(), - metrics: newMetrics(cfg), - sessions: make(map[string]*bfgWs), - checkForInvalidBlocks: make(chan struct{}), - holdoffTimeout: 6 * time.Second, - bfgCallTimeout: 20 * time.Second, - bfgCmdCh: make(chan bfgCmd), + cfg: cfg, + requestLimiter: make(chan bool, cfg.RequestLimit), + btcHeight: cfg.BTCStartHeight, + server: http.NewServeMux(), + publicServer: http.NewServeMux(), + metrics: newMetrics(cfg), + sessions: make(map[string]*bfgWs), + holdoffTimeout: 6 * time.Second, + bfgCallTimeout: 20 * time.Second, + bfgCmdCh: make(chan bfgCmd), } for range cfg.RequestLimit { s.requestLimiter <- true @@ -297,37 +294,6 @@ func NewServer(cfg *Config) (*Server, error) { return s, nil } -func (s *Server) queueCheckForInvalidBlocks() { - select { - case s.checkForInvalidBlocks <- struct{}{}: - default: - } -} - -func (s *Server) invalidBlockChecker(ctx context.Context) { - defer s.wg.Done() - for { - select { - case <-ctx.Done(): - return - case <-s.checkForInvalidBlocks: - heights, err := s.db.BtcBlocksHeightsWithNoChildren(ctx) - if err != nil { - log.Errorf("error trying to get heights for btc blocks: %s", err) - return - } - - log.Infof("received %d heights with no children, will re-check", len(heights)) - for _, height := range heights { - log.Infof("reprocessing block at height %d", height) - if err := s.processBitcoinBlock(ctx, height); err != nil { - log.Errorf("error processing bitcoin block: %s", err) - } - } - } - } -} - // handleRequest is called as a go routine to handle a long-lived command. func (s *Server) handleRequest(pctx context.Context, bws *bfgWs, wsid string, cmd protocol.Command, handler func(ctx context.Context) (any, error)) { log.Tracef("handleRequest: %v", bws.addr) @@ -399,7 +365,14 @@ func (s *Server) handleOneBroadcastRequest(pctx context.Context, highPriority bo serializedTx, err := s.db.BtcTransactionBroadcastRequestGetNext(ctx, highPriority) if err != nil { log.Errorf("error getting next broadcast request: %v", err) - return + + // if there is a communication error, backoff a bit + select { + case <-time.After(1 * time.Second): + return + case <-ctx.Done(): + return + } } // if there are no new serialized txs, backoff a bit @@ -590,10 +563,14 @@ func (s *Server) handleBitcoinUTXOs(ctx context.Context, bur *bfgapi.BitcoinUTXO return buResp, nil } +var ErrAlreadyProcessed error = fmt.Errorf("Already Processed BTC Block") + func (s *Server) processBitcoinBlock(ctx context.Context, height uint64) error { log.Tracef("Processing Bitcoin block at height %d...", height) - rbh, err := s.btcClient.RawBlockHeader(ctx, height) + netCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + rbh, err := s.btcClient.RawBlockHeader(netCtx, height) + cancel() if err != nil { return fmt.Errorf("get block header at height %v: %w", height, err) } @@ -607,29 +584,41 @@ func (s *Server) processBitcoinBlock(ctx context.Context, height uint64) error { btcHeight := height btcHeader := rbh + btcBlockTmpChk, err := s.db.BtcBlockByHash(ctx, [32]byte(btcHeaderHash)) + if err != nil && !errors.Is(err, database.ErrNotFound) { + return err + } + + // block with hash is already at height, no-reorg + if btcBlockTmpChk != nil && btcBlockTmpChk.Height == btcHeight { + return fmt.Errorf("already processed block block: %w", ErrAlreadyProcessed) + } + btcBlock := bfgd.BtcBlock{ Hash: btcHeaderHash, Header: btcHeader[:], Height: btcHeight, } - err = s.db.BtcBlockInsert(ctx, &btcBlock) - if err != nil { - // XXX don't return err here so we keep counting up, need to be smarter - if errors.Is(err, database.ErrDuplicate) { - log.Errorf("could not insert btc block: %s", err) - return nil - } - } + // these might get quite large; we store all found keystones and + // pop bases here to insert at the end + // we will likely find many of the same keystones so store them in a map + // to remove duplicates + l2Keystones := map[string]bfgd.L2Keystone{} + popBases := []bfgd.PopBasis{} for index := uint64(0); ; index++ { - txHash, merkleHashes, err := s.btcClient.TransactionAtPosition(ctx, + log.Tracef("calling tx at pos") + netCtx, cancel = context.WithTimeout(ctx, 5*time.Second) + txHash, merkleHashes, err := s.btcClient.TransactionAtPosition(netCtx, height, index) + cancel() + log.Tracef("done calling tx as pos") if err != nil { - if errors.Is(err, electrs.ErrNoTxAtPosition) { + if errors.Is(err, electrs.ErrNoTxAtPosition) || strings.HasSuffix(err.Error(), "no tx at position") { // There is no way to tell how many transactions are // in a block, so hopefully we've got them all... - return nil + break } return fmt.Errorf("get transaction at position (height %v, index %v): %w", height, index, err) } @@ -652,7 +641,9 @@ func (s *Server) processBitcoinBlock(ctx context.Context, height uint64) error { log.Infof("btc tx is valid with hash %s", txHashEncoded) } - rtx, err := s.btcClient.RawTransaction(ctx, txHash) + netCtx, cancel = context.WithTimeout(ctx, 5*time.Second) + rtx, err := s.btcClient.RawTransaction(netCtx, txHash) + cancel() if err != nil { return fmt.Errorf("get raw transaction with txid %x: %w", txHash, err) } @@ -682,14 +673,8 @@ func (s *Server) processBitcoinBlock(ctx context.Context, height uint64) error { btcTxIndex := index log.Infof("found tl2: %v at position %d", tl2, btcTxIndex) - // attempt to insert the abbreviated keystone, this is in case we have - // not heard of this keystone from op node yet - if err := s.db.L2KeystonesInsert(ctx, []bfgd.L2Keystone{ - hemiL2KeystoneAbrevToDb(*tl2.L2Keystone), - }); err != nil { - // this is not necessarily an error, should it be trace? - log.Infof("could not insert l2 keystone: %s", err) - } + l2kdb := hemiL2KeystoneAbrevToDb(*tl2.L2Keystone) + l2Keystones[hex.EncodeToString(l2kdb.Hash)] = l2kdb publicKeyUncompressed, err := pop.ParsePublicKeyFromSignatureScript(mtx.TxIn[0].SignatureScript) if err != nil { @@ -716,6 +701,35 @@ func (s *Server) processBitcoinBlock(ctx context.Context, height uint64) error { BtcMerklePath: merkleHashes, } + popBases = append(popBases, popBasis) + } + + // XXX: these database inserts should be in a transaction, or simply be atomic + // but they never have been. in the future, let's make sure they are with the + // next solution + // we need to AT LEAST try to insert them here, in case there are network timeouts + // with the above and electrs. + + rowsAffected, err := s.db.BtcBlockReplace(ctx, &btcBlock) + if err != nil { + return fmt.Errorf("error replacing bitcoin block: %w", err) + } + + if rowsAffected == 0 { + return fmt.Errorf("no rows affected: %w", ErrAlreadyProcessed) + } + + // this for loop seems weird but its used to check errors per keystone + for _, l2Keystone := range l2Keystones { + // attempt to insert the abbreviated keystone, this is in case we have + // not heard of this keystone from op node yet + if err := s.db.L2KeystonesInsert(ctx, []bfgd.L2Keystone{l2Keystone}); err != nil { + // this is not necessarily an error, should it be trace? + log.Infof("could not insert l2 keystone: %s", err) + } + } + + for _, popBasis := range popBases { // first, try to update a pop_basis row with NULL btc fields rowsAffected, err := s.db.PopBasisUpdateBTCFields(ctx, &popBasis) if err != nil { @@ -735,16 +749,7 @@ func (s *Server) processBitcoinBlock(ctx context.Context, height uint64) error { } } } -} -func (s *Server) processBitcoinBlocks(ctx context.Context, start, end uint64) error { - for i := start; i <= end; i++ { - if err := s.processBitcoinBlock(ctx, i); err != nil { - return fmt.Errorf("process bitcoin block at height %d: %w", i, err) - } - s.btcHeight = i - } - s.queueCheckForInvalidBlocks() return nil } @@ -754,9 +759,16 @@ func (s *Server) trackBitcoin(ctx context.Context) { log.Tracef("trackBitcoin") defer log.Tracef("trackBitcoin exit") + // upon startup we walk every block between the tip and our + // configured start block. IMPORTANT NOTE: we ONLY process + // transactions in blocks that we have not seen, so whilst we + // walk quite a few blocks, most will be essentially no-ops + // except for when you have an empty database table + initialWalk := true + btcInterval := 5 * time.Second ticker := time.NewTicker(btcInterval) - printMsg := true + for { select { case <-ctx.Done(): @@ -764,37 +776,54 @@ func (s *Server) trackBitcoin(ctx context.Context) { case <-ticker.C: log.Tracef("Checking BTC height...") - btcHeight, err := s.btcClient.Height(ctx) + netCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + btcHeight, err := s.btcClient.Height(netCtx) + cancel() if err != nil { - if printMsg { - // XXX add this to prometheus - log.Errorf("Failed to get Bitcoin height: %v", err) - printMsg = false - } + // XXX add this to prometheus + log.Errorf("Failed to get Bitcoin height: %v", err) continue } s.updateBtcHeightCache(btcHeight) - printMsg = true - if s.btcHeight > btcHeight { - // XXX do we need this check? - log.Errorf("invalid height: current %v > requested %v", - btcHeight, s.btcHeight) - continue - } - if btcHeight <= s.btcHeight { + err = s.walkChain(ctx, btcHeight, !initialWalk) + if err != nil { + log.Errorf("could not walk chain: %s", err) continue } - log.Infof("Bitcoin block height increased to %v", btcHeight) + // after we have done the initial walk with no errors, + // in the future we only walk back until a block that we've seen + initialWalk = false + } + } +} - if err := s.processBitcoinBlocks(ctx, s.btcHeight+1, btcHeight); err != nil { - log.Errorf("Failed to process Bitcoin blocks: %v", err) - continue +func (s *Server) walkChain(ctx context.Context, tip uint64, exitFast bool) error { + log.Tracef("walkChain") + defer log.Tracef("walkChain exit") + + log.Tracef("starting to walk chain; tip=%d, s.cfg.BTCStartHeight=%d, exitFast=%b", tip, s.cfg.BTCStartHeight, exitFast) + for tip >= s.cfg.BTCStartHeight { + log.Tracef("walkChain progress; processing block at height %d", tip) + err := s.processBitcoinBlock(ctx, tip) + if errors.Is(err, ErrAlreadyProcessed) { + log.Infof("block known at height %d", tip) + + // if we have already seen the block, and the caller wishes + // to exit on first known block, do so + if exitFast { + return nil } + } else if err != nil { + return err } + + tip-- } + + return nil } type bfgWs struct { @@ -1759,23 +1788,6 @@ func (s *Server) Run(pctx context.Context) error { } defer s.db.Close() - if s.btcHeight, err = s.BtcBlockCanonicalHeight(ctx); err != nil { - return err - } - - // if there is no height in the db, check the config - if s.btcHeight == 0 { - s.btcHeight = s.cfg.BTCStartHeight - log.Infof("received height of 0 from the db, height of %v from config", - s.cfg.BTCStartHeight) - } - - // if the config doesn't set a height, error - if s.btcHeight == 0 { - return errors.New("could not determine btc start height") - } - log.Debugf("resuming at height %d", s.btcHeight) - // Database notifications btcBlocksPayload, ok := bfgd.NotificationPayload(bfgd.NotificationBtcBlocks) if !ok { @@ -1976,9 +1988,6 @@ func (s *Server) Run(pctx context.Context) error { s.wg.Add(1) go s.trackBitcoin(ctx) - s.wg.Add(1) - go s.invalidBlockChecker(ctx) - select { case <-ctx.Done(): err = ctx.Err()