Skip to content

Commit

Permalink
feat: Handle loading > 1000 Segment Info files
Browse files Browse the repository at this point in the history
  • Loading branch information
mgattozzi committed Jan 25, 2024
1 parent 0de5458 commit 9f81439
Showing 1 changed file with 74 additions and 31 deletions.
105 changes: 74 additions & 31 deletions influxdb3_write/src/persister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,40 +139,60 @@ impl Persister for PersisterImpl {
}
}

async fn load_segments(&self, most_recent_n: usize) -> Result<Vec<PersistedSegment>> {
let segment_list = self
.object_store
.list(Some(&SegmentInfoFilePath::dir()))
.await?
.collect::<Vec<_>>()
.await;

// Why not collect into a Result<Vec<ObjectMeta>, object_store::Error>>
// like we could with Iterators? Well because it's a stream it ends up
// using different traits and can't really do that. So we need to loop
// through to return any errors that might have occurred, then do an
// unstable sort (which is faster and we know won't have any
// duplicates) since these can arrive out of order, and then issue gets
// on the n most recent segments that we want and is returned in order
// of the moste recent to least.
let mut list = Vec::new();
for segment in segment_list {
list.push(segment?);
}
async fn load_segments(&self, mut most_recent_n: usize) -> Result<Vec<PersistedSegment>> {
let mut output = Vec::new();
let mut offset: Option<ObjPath> = None;
while most_recent_n > 0 {
let count = if most_recent_n > 1000 {
most_recent_n -= 1000;
1000
} else {
let count = most_recent_n;
most_recent_n = 0;
count
};

let segment_list = if let Some(offset) = offset {
self.object_store
.list_with_offset(Some(&SegmentInfoFilePath::dir()), &offset)
.await?
.collect::<Vec<_>>()
.await
} else {
self.object_store
.list(Some(&SegmentInfoFilePath::dir()))
.await?
.collect::<Vec<_>>()
.await
};

list.sort_unstable_by(|a, b| a.location.cmp(&b.location));
// Why not collect into a Result<Vec<ObjectMeta>, object_store::Error>>
// like we could with Iterators? Well because it's a stream it ends up
// using different traits and can't really do that. So we need to loop
// through to return any errors that might have occurred, then do an
// unstable sort (which is faster and we know won't have any
// duplicates) since these can arrive out of order, and then issue gets
// on the n most recent segments that we want and is returned in order
// of the moste recent to least.
let mut list = Vec::new();
for segment in segment_list {
list.push(segment?);
}

let len = list.len();
let range = if len <= most_recent_n {
0..len
} else {
0..most_recent_n
};
list.sort_unstable_by(|a, b| a.location.cmp(&b.location));

let mut output = Vec::new();
for item in &list[range] {
let bytes = self.object_store.get(&item.location).await?.bytes().await?;
output.push(serde_json::from_slice(&bytes)?);
let len = list.len();
let end = if len <= count { len } else { count };

for item in &list[0..end] {
let bytes = self.object_store.get(&item.location).await?.bytes().await?;
output.push(serde_json::from_slice(&bytes)?);
}

// Get the last path in the array to use as an offset. This assumes
// we sorted the list as we can't guarantee otherwise the order of
// the list call to the object store.
offset = Some(list[end - 1].location.clone());
}

Ok(output)
Expand Down Expand Up @@ -405,6 +425,29 @@ async fn persist_and_load_segment_info_files_with_fewer_than_requested() {
assert_eq!(segments[0].segment_id.0, 0);
}

#[tokio::test]
/// This test makes sure that the logic for offset lists works
async fn persist_and_load_over_9000_segment_info_files() {
let local_disk = LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
let persister = PersisterImpl::new(Arc::new(local_disk));
for id in 0..9001 {
let info_file = PersistedSegment {
segment_id: SegmentId::new(id),
segment_wal_size_bytes: 0,
databases: HashMap::new(),
segment_min_time: 0,
segment_max_time: 1,
segment_row_count: 0,
segment_parquet_size_bytes: 0,
};
persister.persist_segment(info_file).await.unwrap();
}
let segments = persister.load_segments(9500).await.unwrap();
// We asked for the most recent 9500 so there should be 9001 of them
assert_eq!(segments.len(), 9001);
assert_eq!(segments[0].segment_id.0, 9000);
}

#[tokio::test]
async fn get_parquet_bytes() {
let local_disk = LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
Expand Down

0 comments on commit 9f81439

Please sign in to comment.