-
Notifications
You must be signed in to change notification settings - Fork 184
/
query_test.go
416 lines (356 loc) · 11.7 KB
/
query_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
package neutrino
import (
"compress/bzip2"
"encoding/binary"
"fmt"
"io"
"math/big"
"math/rand"
"os"
"path/filepath"
"testing"
"time"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/btcutil/gcs"
"github.com/btcsuite/btcd/btcutil/gcs/builder"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino/cache/lru"
"github.com/lightninglabs/neutrino/filterdb"
"github.com/lightninglabs/neutrino/headerfs"
"github.com/lightninglabs/neutrino/query"
"github.com/stretchr/testify/require"
)
var (
// maxPowLimit is used as the max block target to ensure all PoWs are
// valid.
bigOne = big.NewInt(1)
maxPowLimit = new(big.Int).Sub(new(big.Int).Lsh(bigOne, 255), bigOne)
// blockDataNet is the expected network in the test block data.
blockDataNet = wire.MainNet
// blockDataFile is the path to a file containing the first 256 blocks
// of the block chain.
blockDataFile = filepath.Join("testdata", "blocks1-256.bz2")
)
// loadBlocks loads the blocks contained in the testdata directory and returns
// a slice of them.
//
// NOTE: copied from btcsuite/btcd/database/ffldb/interface_test.go.
func loadBlocks(t *testing.T, dataFile string, network wire.BitcoinNet) (
[]*btcutil.Block, error) {
// Open the file that contains the blocks for reading.
fi, err := os.Open(dataFile)
if err != nil {
t.Errorf("failed to open file %v, err %v", dataFile, err)
return nil, err
}
defer func() {
if err := fi.Close(); err != nil {
t.Errorf("failed to close file %v %v", dataFile,
err)
}
}()
dr := bzip2.NewReader(fi)
// Set the first block as the genesis block.
blocks := make([]*btcutil.Block, 0, 256)
genesis := btcutil.NewBlock(chaincfg.MainNetParams.GenesisBlock)
blocks = append(blocks, genesis)
// Load the remaining blocks.
for height := 1; ; height++ {
var net uint32
err := binary.Read(dr, binary.LittleEndian, &net)
if err == io.EOF {
// Hit end of file at the expected offset. No error.
break
}
if err != nil {
t.Errorf("Failed to load network type for block %d: %v",
height, err)
return nil, err
}
if net != uint32(network) {
t.Errorf("Block doesn't match network: %v expects %v",
net, network)
return nil, err
}
var blockLen uint32
err = binary.Read(dr, binary.LittleEndian, &blockLen)
if err != nil {
t.Errorf("Failed to load block size for block %d: %v",
height, err)
return nil, err
}
// Read the block.
blockBytes := make([]byte, blockLen)
_, err = io.ReadFull(dr, blockBytes)
if err != nil {
t.Errorf("Failed to load block %d: %v", height, err)
return nil, err
}
// Deserialize and store the block.
block, err := btcutil.NewBlockFromBytes(blockBytes)
if err != nil {
t.Errorf("Failed to parse block %v: %v", height, err)
return nil, err
}
blocks = append(blocks, block)
}
return blocks, nil
}
// genRandomBlockHash generates a random block hash using math/rand.
func genRandomBlockHash() *chainhash.Hash {
var seed [32]byte
rand.Read(seed[:])
hash := chainhash.Hash(seed)
return &hash
}
// getRandFilter generates a random GCS filter that contains numElements. It
// will then convert that filter into CacheableFilter to compute it's size for
// convenience. It will return the filter along with it's size and randomly
// generated block hash. testing.T is passed in as a convenience to deal with
// errors in this method and making the test code more straightforward. Method
// originally taken from filterdb/db_test.go.
func genRandFilter(numElements uint32, t *testing.T) (
*chainhash.Hash, *gcs.Filter, uint64) {
elements := make([][]byte, numElements)
for i := uint32(0); i < numElements; i++ {
var elem [20]byte
if _, err := rand.Read(elem[:]); err != nil {
t.Fatalf("unable to create random filter: %v", err)
return nil, nil, 0
}
elements[i] = elem[:]
}
var key [16]byte
if _, err := rand.Read(key[:]); err != nil {
t.Fatalf("unable to create random filter: %v", err)
return nil, nil, 0
}
filter, err := gcs.BuildGCSFilter(
builder.DefaultP, builder.DefaultM, key, elements)
if err != nil {
t.Fatalf("unable to create random filter: %v", err)
return nil, nil, 0
}
// Convert into CacheableFilter and compute Size.
c := &CacheableFilter{Filter: filter}
s, err := c.Size()
if err != nil {
t.Fatalf("unable to create random filter: %v", err)
return nil, nil, 0
}
return genRandomBlockHash(), filter, s
}
// getFilter is a convenience method which will extract a value from the cache
// and handle errors, it makes the test code easier to follow.
func getFilter(cs *ChainService, b *chainhash.Hash, t *testing.T) *gcs.Filter {
val, err := cs.getFilterFromCache(b, filterdb.RegularFilter)
if err != nil {
t.Fatal(err)
}
return val
}
func assertEqual(t *testing.T, a interface{}, b interface{}, message string) { // nolint:unparam
if a == b {
return
}
if len(message) == 0 {
message = fmt.Sprintf("%v != %v", a, b)
}
t.Fatal(message)
}
// TestCacheBigEnoughHoldsAllFilter creates a cache big enough to hold all
// filters, then gets them in random order and makes sure they are always there.
func TestCacheBigEnoughHoldsAllFilter(t *testing.T) {
// Create different sized filters.
b1, f1, s1 := genRandFilter(1, t)
b2, f2, s2 := genRandFilter(10, t)
b3, f3, s3 := genRandFilter(100, t)
cs := &ChainService{
FilterCache: lru.NewCache[FilterCacheKey, *CacheableFilter](
s1 + s2 + s3,
),
}
// Insert those filters into the cache making sure nothing gets evicted.
assertEqual(t, cs.FilterCache.Len(), 0, "")
cs.putFilterToCache(b1, filterdb.RegularFilter, f1)
assertEqual(t, cs.FilterCache.Len(), 1, "")
cs.putFilterToCache(b2, filterdb.RegularFilter, f2)
assertEqual(t, cs.FilterCache.Len(), 2, "")
cs.putFilterToCache(b3, filterdb.RegularFilter, f3)
assertEqual(t, cs.FilterCache.Len(), 3, "")
// Check that we can get those filters back independent of Get order.
assertEqual(t, getFilter(cs, b1, t), f1, "")
assertEqual(t, getFilter(cs, b2, t), f2, "")
assertEqual(t, getFilter(cs, b3, t), f3, "")
assertEqual(t, getFilter(cs, b2, t), f2, "")
assertEqual(t, getFilter(cs, b3, t), f3, "")
assertEqual(t, getFilter(cs, b1, t), f1, "")
assertEqual(t, getFilter(cs, b3, t), f3, "")
assertEqual(t, cs.FilterCache.Len(), 3, "")
}
// TestBigFilterEvictsEverything creates a cache big enough to hold a large
// filter and inserts many smaller filters into. Then it inserts the big filter
// and verifies that it's the only one remaining.
func TestBigFilterEvictsEverything(t *testing.T) {
// Create different sized filters.
b1, f1, _ := genRandFilter(1, t)
b2, f2, _ := genRandFilter(3, t)
b3, f3, s3 := genRandFilter(10, t)
cs := &ChainService{
FilterCache: lru.NewCache[FilterCacheKey, *CacheableFilter](
s3,
),
}
// Insert the smaller filters.
assertEqual(t, cs.FilterCache.Len(), 0, "")
cs.putFilterToCache(b1, filterdb.RegularFilter, f1)
assertEqual(t, cs.FilterCache.Len(), 1, "")
cs.putFilterToCache(b2, filterdb.RegularFilter, f2)
assertEqual(t, cs.FilterCache.Len(), 2, "")
// Insert the big filter and check all previous filters are evicted.
cs.putFilterToCache(b3, filterdb.RegularFilter, f3)
assertEqual(t, cs.FilterCache.Len(), 1, "")
assertEqual(t, getFilter(cs, b3, t), f3, "")
}
// TestBlockCache checks that blocks are inserted and fetched from the cache
// before peers are queried.
func TestBlockCache(t *testing.T) {
t.Parallel()
// Load the first 255 blocks from disk.
blocks, err := loadBlocks(t, blockDataFile, blockDataNet)
if err != nil {
t.Fatalf("loadBlocks: Unexpected error: %v", err)
}
// We'll use a simple mock header store since the GetBlocks method
// assumes we only query for blocks with an already known header.
headers := newMockBlockHeaderStore()
// Iterate through the blocks, calculating the size of half of them,
// and writing them to the header store.
var size uint64
for i, b := range blocks {
header := headerfs.BlockHeader{
BlockHeader: &b.MsgBlock().Header,
Height: uint32(i),
}
headers.WriteHeaders(header)
sz, _ := (&CacheableBlock{Block: b}).Size()
if i < len(blocks)/2 {
size += sz
}
}
// Set up a ChainService with a BlockCache that can fit the first half
// of the blocks.
cs := &ChainService{
BlockCache: lru.NewCache[wire.InvVect, *CacheableBlock](
size,
),
BlockHeaders: headers,
chainParams: chaincfg.Params{
PowLimit: maxPowLimit,
},
timeSource: blockchain.NewMedianTime(),
workManager: &mockDispatcher{},
}
// We'll set up the queryPeers method to make sure we are only querying
// for blocks, and send the block hashes queried over the queries
// channel.
queries := make(chan chainhash.Hash, 1)
cs.workManager.(*mockDispatcher).query = func(reqs []*query.Request,
opts ...query.QueryOption) chan error {
errChan := make(chan error, 1)
defer close(errChan)
require.Len(t, reqs, 1)
require.IsType(t, &wire.MsgGetData{}, reqs[0].Req)
getData := reqs[0].Req.(*wire.MsgGetData)
require.Len(t, getData.InvList, 1)
inv := getData.InvList[0]
require.Equal(t, wire.InvTypeWitnessBlock, inv.Type)
// Serve the block that matches the requested block header.
for _, b := range blocks {
if *b.Hash() != inv.Hash {
continue
}
header, _, err := headers.FetchHeader(b.Hash())
require.NoError(t, err)
resp := &wire.MsgBlock{
Header: *header,
Transactions: b.MsgBlock().Transactions,
}
progress := reqs[0].HandleResp(getData, resp, "")
require.True(t, progress.Progressed)
require.True(t, progress.Finished)
// Notify the test about the query.
select {
case queries <- inv.Hash:
case <-time.After(1 * time.Second):
t.Fatalf("query was not handled")
}
return errChan
}
t.Fatalf("queried for unknown block: %v", inv.Hash)
return errChan
}
// fetchAndAssertPeersQueried calls GetBlock and makes sure the block
// is fetched from the peers.
fetchAndAssertPeersQueried := func(hash chainhash.Hash) {
found, err := cs.GetBlock(hash)
if err != nil {
t.Fatalf("error getting block: %v", err)
}
if *found.Hash() != hash {
t.Fatalf("requested block with hash %v, got %v",
hash, found.Hash())
}
select {
case q := <-queries:
if q != hash {
t.Fatalf("expected hash %v to be queried, "+
"got %v", hash, q)
}
case <-time.After(1 * time.Second):
t.Fatalf("did not query peers for block")
}
}
// fetchAndAssertInCache calls GetBlock and makes sure the block is not
// fetched from the peers.
fetchAndAssertInCache := func(hash chainhash.Hash) {
found, err := cs.GetBlock(hash)
if err != nil {
t.Fatalf("error getting block: %v", err)
}
if *found.Hash() != hash {
t.Fatalf("requested block with hash %v, got %v",
hash, found.Hash())
}
// Make sure we didn't query the peers for this block.
select {
case q := <-queries:
t.Fatalf("did not expect query for block %v", q)
default:
}
}
// Get the first half of the blocks. Since this is the first time we
// request them, we expect them all to be fetched from peers.
for _, b := range blocks[:len(blocks)/2] {
fetchAndAssertPeersQueried(*b.Hash())
}
// Get the first half of the blocks again. This time we expect them all
// to be fetched from the cache.
for _, b := range blocks[:len(blocks)/2] {
fetchAndAssertInCache(*b.Hash())
}
// Get the second half of the blocks. These have not been fetched
// before, and we expect them to be fetched from peers.
for _, b := range blocks[len(blocks)/2:] {
fetchAndAssertPeersQueried(*b.Hash())
}
// Since the cache only had capacity for the first half of the blocks,
// some of these should now have been evicted. We only check the first
// one, since we cannot know for sure how many because of the variable
// size.
b := blocks[0]
fetchAndAssertPeersQueried(*b.Hash())
}