Skip to content

Commit

Permalink
reintroduce resume, tweak configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
luizirber committed Jan 8, 2025
1 parent b67e1d2 commit ef31227
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 7 deletions.
33 changes: 32 additions & 1 deletion src/core/src/index/revindex/disk_revindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,18 @@ impl RevIndex {
let db = Arc::new(DB::open_cf_descriptors(&opts, path, cfs).unwrap());

let collection = Arc::new(collection);
info!("sigs in the new index once finished: {}", collection.len());
let processed = Arc::new(RwLock::new(Self::load_processed(
db.clone(),
collection.clone(),
true,
)?));

let left_to_process = collection.len() - processed.read().unwrap().len();
if left_to_process != collection.len() {
info!("sigs left to process: {}", left_to_process);
}

let index = Self {
db,
collection,
Expand Down Expand Up @@ -155,8 +161,20 @@ impl RevIndex {

// if cached in a new field in the RevIndex,
// then update the cache too

processed.write().unwrap().extend(dataset_ids);

// finished processing of this batch,
// do a merge_cf in the PROCESSED key in metadata
// to account for processed datasets in this batch.
let cf_metadata = index.db.cf_handle(METADATA).unwrap();
index
.db
.merge_cf(
&cf_metadata,
PROCESSED,
processed.read().unwrap().as_bytes().unwrap().as_slice(),
)
.expect("error merging");
});

info!("Compact SSTs");
Expand Down Expand Up @@ -555,6 +573,19 @@ impl RevIndexOps for RevIndex {
// then update the cache too

processed.write().unwrap().extend(dataset_ids);

// finished processing of this batch,
// do a merge_cf in the PROCESSED key in metadata
// to account for processed datasets in this batch.
let cf_metadata = self.db.cf_handle(METADATA).unwrap();
self
.db
.merge_cf(
&cf_metadata,
PROCESSED,
processed.read().unwrap().as_bytes().unwrap().as_slice(),
)
.expect("error merging");
});

info!("Compact SSTs");
Expand Down
4 changes: 2 additions & 2 deletions src/core/src/index/revindex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl Datasets {
fn from_slice(slice: &[u8]) -> Option<Self> {
use byteorder::ReadBytesExt;

if slice.len() == 8 {
if slice.len() == 8 || slice.len() == 4 {
// Unique
Some(Self::Unique(
(&slice[..]).read_u32::<LittleEndian>().unwrap(),
Expand All @@ -308,7 +308,7 @@ impl Datasets {
match self {
Self::Empty => Some(vec![42_u8]),
Self::Unique(v) => {
let mut buf = vec![0u8; 8];
let mut buf = vec![0u8; 4];
(&mut buf[..])
.write_u32::<LittleEndian>(*v)
.expect("error writing bytes");
Expand Down
16 changes: 12 additions & 4 deletions src/core/src/storage/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ pub(crate) fn cf_descriptors(cache: rocksdb::Cache) -> Vec<ColumnFamilyDescripto
cfopts.set_blob_compression_type(rocksdb::DBCompressionType::Zstd);
*/

cfopts.set_max_write_buffer_number(16);
cfopts.set_max_write_buffer_number(32);
cfopts.set_level_zero_file_num_compaction_trigger(8);
cfopts.set_max_bytes_for_level_multiplier(8.0);
// write_buffer_size (64 MiB) * write_buffer_number (32) == 2 GiB
cfopts.set_max_bytes_for_level_base(1024 << 21); // 2 GiB
cfopts.set_max_background_jobs(rayon::current_num_threads() as i32);
cfopts.set_merge_operator_associative(
"datasets operator",
crate::index::revindex::disk_revindex::merge_datasets,
Expand Down Expand Up @@ -128,7 +133,8 @@ pub(crate) fn cf_descriptors(cache: rocksdb::Cache) -> Vec<ColumnFamilyDescripto
let cf_hashes = ColumnFamilyDescriptor::new(HASHES, cfopts);

let mut cfopts = db_options();
cfopts.set_max_write_buffer_number(16);
cfopts.set_max_write_buffer_number(32);
cfopts.set_max_background_jobs(rayon::current_num_threads() as i32);
cfopts.set_merge_operator_associative(
"datasets operator",
crate::index::revindex::disk_revindex::merge_datasets,
Expand All @@ -139,14 +145,16 @@ pub(crate) fn cf_descriptors(cache: rocksdb::Cache) -> Vec<ColumnFamilyDescripto
let cf_metadata = ColumnFamilyDescriptor::new(METADATA, cfopts);

let mut cfopts = db_options();
cfopts.set_max_write_buffer_number(16);
cfopts.set_max_write_buffer_number(32);
cfopts.set_max_background_jobs(rayon::current_num_threads() as i32);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);

let cf_storage = ColumnFamilyDescriptor::new(STORAGE, cfopts);

let mut cfopts = db_options();
cfopts.set_max_write_buffer_number(16);
cfopts.set_max_write_buffer_number(32);
cfopts.set_max_background_jobs(rayon::current_num_threads() as i32);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);

Expand Down

0 comments on commit ef31227

Please sign in to comment.