-
Notifications
You must be signed in to change notification settings - Fork 133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tx cache #110
base: new-index
Are you sure you want to change the base?
Tx cache #110
Changes from all commits
82997ca
7dee400
1a680b0
32d2316
18fad46
bfc323a
ece6e73
3ffbda0
33a8d22
9753745
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -309,6 +309,18 @@ impl Mempool { | |
txids.push(txid); | ||
self.txstore.insert(txid, tx); | ||
} | ||
|
||
// Populate tx cache | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there an advantage to populating the cache with mempool transactions, which already are stored entirely in memory? Also, it seems that the cache is only used when looking up on-chain txs (via There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well when the mempool is evicted they can still be looked up from the tx cache There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, but why store them while they're still in the mempool and not wait until they get confirmed (and cached naturally when they're used)? Also its possible that by the time they confirm, they won't longer be in the FIFO cache |
||
let txid_misses = self.chain.txs_cache_miss(&txids); | ||
let mut tx_misses = vec![]; | ||
for txid in txid_misses { | ||
if let Some(tx) = self.txstore.get(&txid) { | ||
let bytes = serialize(tx); | ||
tx_misses.push((txid, bytes)); | ||
} | ||
} | ||
self.chain.add_txs_to_cache(&tx_misses); | ||
|
||
// Phase 2: index history and spend edges (can fail if some txos cannot be found) | ||
let txos = match self.lookup_txos(self.get_prevouts(&txids)) { | ||
Ok(txos) => txos, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,10 +2,12 @@ use bitcoin::hashes::sha256d::Hash as Sha256dHash; | |
#[cfg(not(feature = "liquid"))] | ||
use bitcoin::merkle_tree::MerkleBlock; | ||
use bitcoin::VarInt; | ||
use bitcoin_slices::SliceCache; | ||
use crypto::digest::Digest; | ||
use crypto::sha2::Sha256; | ||
use hex::FromHex; | ||
use itertools::Itertools; | ||
use prometheus::IntCounter; | ||
use rayon::prelude::*; | ||
|
||
#[cfg(not(feature = "liquid"))] | ||
|
@@ -17,9 +19,12 @@ use elements::{ | |
AssetId, | ||
}; | ||
|
||
use std::collections::{BTreeSet, HashMap, HashSet}; | ||
use std::path::Path; | ||
use std::sync::{Arc, RwLock}; | ||
use std::{ | ||
collections::{BTreeSet, HashMap, HashSet}, | ||
sync::Mutex, | ||
}; | ||
|
||
use crate::chain::{ | ||
BlockHash, BlockHeader, Network, OutPoint, Script, Transaction, TxOut, Txid, Value, | ||
|
@@ -198,6 +203,11 @@ pub struct ChainQuery { | |
light_mode: bool, | ||
duration: HistogramVec, | ||
network: Network, | ||
|
||
/// By default 1GB of cached transactions, `Txid -> Transaction` | ||
txs_cache: Mutex<SliceCache<Txid>>, | ||
cache_hit: IntCounter, | ||
cache_miss: IntCounter, | ||
} | ||
|
||
// TODO: &[Block] should be an iterator / a queue. | ||
|
@@ -360,6 +370,9 @@ impl ChainQuery { | |
HistogramOpts::new("query_duration", "Index query duration (in seconds)"), | ||
&["name"], | ||
), | ||
txs_cache: Mutex::new(SliceCache::new(config.tx_cache_size << 20)), | ||
cache_hit: metrics.counter(MetricOpts::new("tx_cache_hit", "Tx cache Hit")), | ||
cache_miss: metrics.counter(MetricOpts::new("tx_cache_miss", "Tx cache Miss")), | ||
} | ||
} | ||
|
||
|
@@ -838,7 +851,16 @@ impl ChainQuery { | |
pub fn lookup_raw_txn(&self, txid: &Txid, blockhash: Option<&BlockHash>) -> Option<Bytes> { | ||
let _timer = self.start_timer("lookup_raw_txn"); | ||
|
||
if self.light_mode { | ||
if let Ok(cache) = self.txs_cache.lock() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it expected that locking may sometime fail and that the failure should be ignored? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think is expected to fail, but since here everything would work normally in case of failure (just perf degradation) I preferred this way There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A poisoned mutex is likely to indicate some underlying coding issue, wouldn't it be better to let it error visibly? I would at least log a warn-level message instead of silently ignoring it, but generally my preference is the "fail fast" approach for unanticipated errors. Failing fast (and restarting) would also re-enable the tx cache, instead of having it continue with degraded performance (until the process eventually restarts for another reason). |
||
if let Some(bytes) = cache.get(txid) { | ||
self.cache_hit.inc(); | ||
return Some(bytes.to_vec()); | ||
} else { | ||
self.cache_miss.inc(); | ||
} | ||
} | ||
|
||
let result = if self.light_mode { | ||
let queried_blockhash = | ||
blockhash.map_or_else(|| self.tx_confirming_block(txid).map(|b| b.hash), |_| None); | ||
let blockhash = blockhash.or_else(|| queried_blockhash.as_ref())?; | ||
|
@@ -848,9 +870,35 @@ impl ChainQuery { | |
.gettransaction_raw(txid, blockhash, false) | ||
.ok()?; | ||
let txhex = txval.as_str().expect("valid tx from bitcoind"); | ||
Some(Bytes::from_hex(txhex).expect("valid tx from bitcoind")) | ||
let vec = Bytes::from_hex(txhex).expect("valid tx from bitcoind"); | ||
|
||
Some(vec) | ||
} else { | ||
self.store.txstore_db.get(&TxRow::key(&txid[..])) | ||
}; | ||
if let Some(result) = result.as_ref() { | ||
self.add_txs_to_cache(&[(*txid, result)]); | ||
} | ||
result | ||
} | ||
|
||
pub fn txs_cache_miss(&self, txids: &[Txid]) -> Vec<Txid> { | ||
let mut result = vec![]; | ||
if let Ok(cache) = self.txs_cache.lock() { | ||
for txid in txids { | ||
if !cache.contains(txid) { | ||
result.push(*txid); | ||
} | ||
} | ||
} | ||
result | ||
} | ||
|
||
pub fn add_txs_to_cache<T: AsRef<[u8]>>(&self, txs: &[(Txid, T)]) { | ||
if let Ok(mut cache) = self.txs_cache.lock() { | ||
for (txid, tx) in txs { | ||
let _ = cache.insert(*txid, &tx.as_ref()); | ||
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean to include this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, just a draft...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And by the way i was trying to undersand why was failing in my test run but the log, even changed, wasn't helping, in the end the error was "rpc queue limit reached"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were you running with #89? If so, note that it requires adjusting bitcoind's
rpcworkqueue
andrpcthreads
options upwards.